From afbdd6f575353fdba58eed009011a78622ba5365 Mon Sep 17 00:00:00 2001 From: Nir Sonnenschein Date: Thu, 23 Oct 2025 15:21:22 +0300 Subject: [PATCH 1/2] Add torch.compile test conf to exsisting tests Add a pytest configuration to test existing tests using torch.compile. This should increase the torch.compile test coverage. added to existing tests in-place to avoid code duplication. Signed-off-by: Nir Sonnenschein --- tests/unit/checkpoint/common.py | 5 +- .../unit/checkpoint/test_latest_checkpoint.py | 11 +- tests/unit/checkpoint/test_lr_scheduler.py | 11 +- tests/unit/checkpoint/test_moe_checkpoint.py | 6 +- tests/unit/checkpoint/test_other_optimizer.py | 24 ++-- tests/unit/checkpoint/test_pipeline.py | 6 +- tests/unit/checkpoint/test_shared_weights.py | 7 +- tests/unit/checkpoint/test_sparse.py | 7 +- tests/unit/checkpoint/test_tag_validation.py | 10 +- .../checkpoint/test_universal_checkpoint.py | 21 ++-- tests/unit/checkpoint/test_zero_optimizer.py | 119 ++++++++++++++---- .../inference/test_checkpoint_sharding.py | 11 +- tests/unit/inference/test_inference.py | 40 +++++- 13 files changed, 216 insertions(+), 62 deletions(-) diff --git a/tests/unit/checkpoint/common.py b/tests/unit/checkpoint/common.py index 0daa1b070850..6a1d5d72267c 100644 --- a/tests/unit/checkpoint/common.py +++ b/tests/unit/checkpoint/common.py @@ -174,11 +174,14 @@ def checkpoint_correctness_verification(config_dict, empty_tag=False, seq_dataloader=False, load_module_only=False, - dtype=None): + dtype=None, + compile_mode=False): if dtype is None: dtype = preferred_dtype() ds_model = create_deepspeed_model(config_dict=config_dict, model=models[0], base_optimizer=base_optimizers[0]) + if compile_mode: + ds_model.compile() if seq_dataloader: data_loader = sequence_dataloader(model=ds_model, diff --git a/tests/unit/checkpoint/test_latest_checkpoint.py b/tests/unit/checkpoint/test_latest_checkpoint.py index 5d795c4dadcf..cf9d6976d712 100644 --- a/tests/unit/checkpoint/test_latest_checkpoint.py +++ b/tests/unit/checkpoint/test_latest_checkpoint.py @@ -19,7 +19,8 @@ class TestLatestCheckpoint(DistributedTest): world_size = 1 - def test_existing_latest(self, tmpdir): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test_existing_latest(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, "steps_per_print": 1, @@ -39,9 +40,11 @@ def test_existing_latest(self, tmpdir): load_optimizer_states=True, load_lr_scheduler_states=False, empty_tag=True, - dtype=torch.float) + dtype=torch.float, + compile_mode=compile_mode) - def test_missing_latest(self, tmpdir): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test_missing_latest(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, "steps_per_print": 1, @@ -55,5 +58,7 @@ def test_missing_latest(self, tmpdir): hidden_dim = 10 model = SimpleModel(hidden_dim) model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + if compile_mode: + model.compile() # should be no-op, since latest doesn't exist model.load_checkpoint(tmpdir) diff --git a/tests/unit/checkpoint/test_lr_scheduler.py b/tests/unit/checkpoint/test_lr_scheduler.py index 6dd7e3279521..9bd9b5fd5815 100644 --- a/tests/unit/checkpoint/test_lr_scheduler.py +++ b/tests/unit/checkpoint/test_lr_scheduler.py @@ -15,12 +15,13 @@ import pytest +@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage, use_cpu_offload', [(0, False), (1, False), (2, False), (2, True), (3, False), (3, True)]) class TestLRSchedulerCheckpoint(DistributedTest): world_size = 2 - def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload): + def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload, compile_mode): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") if get_accelerator().device_name() == 'cpu': @@ -70,9 +71,10 @@ def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload): hidden_dim, tmpdir, load_optimizer_states=False, - load_lr_scheduler_states=True) + load_lr_scheduler_states=True, + compile_mode=compile_mode) - def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload): + def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload, compile_mode): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") if get_accelerator().device_name() == 'cpu': @@ -117,4 +119,5 @@ def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload): hidden_dim, tmpdir, load_optimizer_states=False, - load_lr_scheduler_states=False) + load_lr_scheduler_states=False, + compile_mode=compile_mode) diff --git a/tests/unit/checkpoint/test_moe_checkpoint.py b/tests/unit/checkpoint/test_moe_checkpoint.py index 89878b5d8fa9..ed24f2818906 100644 --- a/tests/unit/checkpoint/test_moe_checkpoint.py +++ b/tests/unit/checkpoint/test_moe_checkpoint.py @@ -38,8 +38,9 @@ def test_checkpoint_moe(self, tmpdir, ep_size): seq_dataloader=True, dtype=torch.float16) + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize("ep_size, load_optim_states", [(4, True), (4, False), (2, True), (2, False)]) - def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states): + def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states, compile_mode): if not required_torch_version(min_version=1.8): pytest.skip("DeepSpeed MoE tests need torch 1.8 or higher to run correctly") @@ -80,4 +81,5 @@ def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states): empty_tag=True, base_optimizers=optimizers, seq_dataloader=True, - dtype=torch.float16) + dtype=torch.float16, + compile_mode=compile_mode) diff --git a/tests/unit/checkpoint/test_other_optimizer.py b/tests/unit/checkpoint/test_other_optimizer.py index 9d623260f1dd..a04fbb05cdf2 100644 --- a/tests/unit/checkpoint/test_other_optimizer.py +++ b/tests/unit/checkpoint/test_other_optimizer.py @@ -18,7 +18,8 @@ class TestOtherOptimizerCheckpoint(DistributedTest): world_size = 2 @pytest.mark.skipif(not deepspeed.ops.__compatible_ops__[FusedLambBuilder.NAME], reason="lamb is not compatible") - def test_checkpoint_unfused_optimizer(self, tmpdir): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test_checkpoint_unfused_optimizer(self, tmpdir, compile_mode): #if not get_accelerator().is_fp16_supported(): # pytest.skip("fp16 is not supported") config_dict = { @@ -67,7 +68,8 @@ def test_checkpoint_unfused_optimizer(self, tmpdir): hidden_dim=hidden_dim, tmpdir=tmpdir, load_optimizer_states=True, - dtype=dtype) + dtype=dtype, + compile_mode=compile_mode) # Ignore optimizer states checkpoint_correctness_verification(config_dict, @@ -75,9 +77,11 @@ def test_checkpoint_unfused_optimizer(self, tmpdir): hidden_dim=hidden_dim, tmpdir=tmpdir, load_optimizer_states=False, - dtype=dtype) + dtype=dtype, + compile_mode=compile_mode) - def test_checkpoint_fused_optimizer(self, tmpdir): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test_checkpoint_fused_optimizer(self, tmpdir, compile_mode): if get_accelerator().device_name() == "cpu": pytest.skip("CPU accelerator does not support this test") config_dict = { @@ -108,7 +112,8 @@ def test_checkpoint_fused_optimizer(self, tmpdir): hidden_dim=hidden_dim, tmpdir=tmpdir, load_optimizer_states=True, - dtype=dtype) + dtype=dtype, + compile_mode=compile_mode) # Ignore optimizer states checkpoint_correctness_verification(config_dict, @@ -116,9 +121,11 @@ def test_checkpoint_fused_optimizer(self, tmpdir): hidden_dim=hidden_dim, tmpdir=tmpdir, load_optimizer_states=False, - dtype=dtype) + dtype=dtype, + compile_mode=compile_mode) - def test_checkpoint_fp32_optimizer(self, tmpdir): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test_checkpoint_fp32_optimizer(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, "steps_per_print": 1, @@ -143,4 +150,5 @@ def test_checkpoint_fp32_optimizer(self, tmpdir): models=models, hidden_dim=hidden_dim, tmpdir=tmpdir, - dtype=torch.float32) + dtype=torch.float32, + compile_mode=compile_mode) diff --git a/tests/unit/checkpoint/test_pipeline.py b/tests/unit/checkpoint/test_pipeline.py index c6c228ccada7..c90f5dbe1cf6 100644 --- a/tests/unit/checkpoint/test_pipeline.py +++ b/tests/unit/checkpoint/test_pipeline.py @@ -15,8 +15,9 @@ class TestPipelineCheckpoint(DistributedTest): world_size = 4 + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize("zero_stage", [0, 1]) - def test_checkpoint_pipe_engine(self, zero_stage, tmpdir): + def test_checkpoint_pipe_engine(self, zero_stage, tmpdir, compile_mode): skip_on_arch(min_arch=7) config_dict = { @@ -61,7 +62,8 @@ def test_checkpoint_pipe_engine(self, zero_stage, tmpdir): load_optimizer_states=True, load_lr_scheduler_states=True, train_batch=True, - dtype=torch.float16 if zero_stage > 0 else torch.float32) + dtype=torch.float16 if zero_stage > 0 else torch.float32, + compile_mode=compile_mode) @pytest.mark.parametrize( "base_topo,test_topo", diff --git a/tests/unit/checkpoint/test_shared_weights.py b/tests/unit/checkpoint/test_shared_weights.py index ed69073fb81c..bd2f1061e601 100644 --- a/tests/unit/checkpoint/test_shared_weights.py +++ b/tests/unit/checkpoint/test_shared_weights.py @@ -7,6 +7,7 @@ import torch.nn as nn import deepspeed +import pytest from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint from unit.common import DistributedTest @@ -25,7 +26,8 @@ def __init__(self): class TestCheckpointSharedWeights(DistributedTest): world_size = 2 - def test_checkpoint_shared_weights(self, tmp_path): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test_checkpoint_shared_weights(self, tmp_path, compile_mode): config = { "train_micro_batch_size_per_gpu": 2, "zero_allow_untested_optimizer": True, @@ -41,6 +43,9 @@ def test_checkpoint_shared_weights(self, tmp_path): model=model, optimizer=optimizer, ) + if compile_mode: + deepspeed_engine.compile() + filename = tmp_path / "checkpoint.pt" deepspeed_engine.save_checkpoint(filename, tag="checkpoint") diff --git a/tests/unit/checkpoint/test_sparse.py b/tests/unit/checkpoint/test_sparse.py index 19fbcd81e473..e2f0e1dc079b 100644 --- a/tests/unit/checkpoint/test_sparse.py +++ b/tests/unit/checkpoint/test_sparse.py @@ -24,8 +24,9 @@ class TestSparseCheckpoint(DistributedTest): [True, False], [True, True], ]) + @pytest.mark.parametrize('compile_mode', [True, False]) def test_non_strict_load_sparse(self, tmpdir, to_save_model_has_embedding, to_save_model_sparse, - destination_has_embedding, destination_sparse): + destination_has_embedding, destination_sparse, compile_mode): class ModelNoEmbedding(torch.nn.Module): @@ -66,6 +67,10 @@ def forward(self, x, offsets): "sparse_gradients": destination_sparse }) + if compile_mode: + engine_to_save.compile() + engine_destination.compile() + save_folder = os.path.join(tmpdir, 'saved_checkpoint') save_tag = '1' diff --git a/tests/unit/checkpoint/test_tag_validation.py b/tests/unit/checkpoint/test_tag_validation.py index b164c31e52b0..edbc42dcadf4 100644 --- a/tests/unit/checkpoint/test_tag_validation.py +++ b/tests/unit/checkpoint/test_tag_validation.py @@ -14,8 +14,9 @@ class TestCheckpointValidationTag(DistributedTest): world_size = 2 + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('valid_mode', ["FAIL", "WARN", "IGNORE"]) - def test_checkpoint_unique_tag(self, tmpdir, valid_mode): + def test_checkpoint_unique_tag(self, tmpdir, valid_mode, compile_mode): config_dict = { "train_batch_size": 2, "steps_per_print": 1, @@ -33,13 +34,16 @@ def test_checkpoint_unique_tag(self, tmpdir, valid_mode): model = SimpleModel(hidden_dim) model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + if compile_mode: + model.compile() if valid_mode == "FAIL": with pytest.raises(AssertionError): model.save_checkpoint(save_dir=tmpdir, tag=f"tag-{dist.get_rank()}") else: model.save_checkpoint(save_dir=tmpdir, tag=f"tag-{dist.get_rank()}") - def test_checkpoint_unknown_tag_validation(self, tmpdir): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test_checkpoint_unknown_tag_validation(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, @@ -60,3 +64,5 @@ def test_checkpoint_unknown_tag_validation(self, tmpdir): with pytest.raises(deepspeed.DeepSpeedConfigError): model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + if compile_mode: + model.compile() diff --git a/tests/unit/checkpoint/test_universal_checkpoint.py b/tests/unit/checkpoint/test_universal_checkpoint.py index 27e151103cc4..aa10d27d4804 100644 --- a/tests/unit/checkpoint/test_universal_checkpoint.py +++ b/tests/unit/checkpoint/test_universal_checkpoint.py @@ -211,9 +211,10 @@ def update_gathered_stage3_optimizer(optimizer_state, param_shapes, world_size): @pytest.mark.parametrize("use_torch_adam", [False, True]) @pytest.mark.parametrize("load_optim", [False, True]) @pytest.mark.parametrize("sub_group_size", [-1, 100]) +@pytest.mark.parametrize('compile_mode', [True, False]) class TestZeROUniversalCheckpointDP(DistributedTest): - def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_size): + def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_size, compile_mode): if dtype == torch.bfloat16 and not bf16_required_version_check(): pytest.skip( " DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly" @@ -225,6 +226,9 @@ def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_ ds_config["checkpoint"] = {"load_universal": True} univ_model = SimpleModel(hidden_dim, nlayers=2) univ_model = init_ds_engine(univ_model, ds_config, use_torch_adam) + if compile_mode: + univ_model.compile() + univ_model.load_checkpoint(tmpdir, tag=f"{CP_TAG}_universal", load_optimizer_states=load_optim) model_state = univ_model.state_dict() @@ -260,13 +264,16 @@ def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_ univ_model.destroy() @pytest.mark.world_size(2) - def test_dp_world_size_2to2(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam): - self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 2) + def test_dp_world_size_2to2(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam, + compile_mode): + self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, compile_mode) @pytest.mark.world_size(2) - def test_dp_world_size_4to2(self, baseline_ws4, tmpdir, dtype, ds_config, load_optim, use_torch_adam): - self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 2) + def test_dp_world_size_4to2(self, baseline_ws4, tmpdir, dtype, ds_config, load_optim, use_torch_adam, + compile_mode): + self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, compile_mode) @pytest.mark.world_size(4) - def test_dp_world_size_2to4(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam): - self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 4) + def test_dp_world_size_2to4(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam, + compile_mode): + self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 4, compile_mode) diff --git a/tests/unit/checkpoint/test_zero_optimizer.py b/tests/unit/checkpoint/test_zero_optimizer.py index 85c38d7f5ffd..8a4a15360644 100644 --- a/tests/unit/checkpoint/test_zero_optimizer.py +++ b/tests/unit/checkpoint/test_zero_optimizer.py @@ -21,8 +21,9 @@ class TestZeROCheckpoint(DistributedTest): world_size = 2 + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [3]) - def test_pipeline_checkpoint_loading(self, tmpdir, zero_stage): + def test_pipeline_checkpoint_loading(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 2, "optimizer": { @@ -41,14 +42,19 @@ def test_pipeline_checkpoint_loading(self, tmpdir, zero_stage): with deepspeed.zero.Init(config_dict_or_path=config_dict): models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)] + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_module_only=True, + compile_mode=compile_mode) - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True) - + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer', [(1, False, 'Adam'), (2, False, 'Adam'), (2, True, 'deepspeed_adam'), (3, False, 'Adam'), (3, True, 'deepspeed_adam')]) - def test_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer): + def test_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer, compile_mode): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") @@ -82,13 +88,19 @@ def test_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_op else: models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=True) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_optimizer_states=True, + compile_mode=compile_mode) + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer', [(1, False, "Adam"), (2, False, "Adam"), (2, True, 'deepspeed_adam'), (3, False, 'Adam'), (3, True, 'deepspeed_adam')]) - def test_not_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer): + def test_not_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer, compile_mode): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") @@ -123,10 +135,16 @@ def test_not_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, ada else: models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=False) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_optimizer_states=False, + compile_mode=compile_mode) + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [1, 2]) - def test_hybrid_optimizer_state(self, tmpdir, zero_stage): + def test_hybrid_optimizer_state(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_micro_batch_size_per_gpu": 2, "gradient_accumulation_steps": 2, @@ -149,10 +167,12 @@ def test_hybrid_optimizer_state(self, tmpdir, zero_stage): base_optimizers=optimizers, hidden_dim=hidden_dim, tmpdir=tmpdir, - load_optimizer_states=True) + load_optimizer_states=True, + compile_mode=compile_mode) + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) - def test_load_module_only(self, tmpdir, zero_stage): + def test_load_module_only(self, tmpdir, zero_stage, compile_mode): if zero_stage == 0 and get_accelerator().device_name() == "cpu": pytest.skip("CPU Accelerator does not support this test") config_dict = { @@ -176,7 +196,12 @@ def test_load_module_only(self, tmpdir, zero_stage): else: models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_module_only=True, + compile_mode=compile_mode) class ws4_model_checkpoint(DistributedFixture): @@ -212,13 +237,14 @@ def run(self, class_tmpdir, elastic_save, load_optim): model.save_checkpoint(class_tmpdir) +@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize("elastic_save", [True, False]) @pytest.mark.parametrize("elastic_load", [True, False]) @pytest.mark.parametrize("load_optim", [True, False]) class TestZeROElasticCheckpoint(DistributedTest): world_size = 2 - def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, load_optim): + def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, load_optim, compile_mode): config_dict = { "train_batch_size": 2, "optimizer": { @@ -243,6 +269,8 @@ def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, l model, _, _, _ = deepspeed.initialize(config=config_dict, model=models[0], model_parameters=models[0].parameters()) + if compile_mode: + model.compile() run_steps = 8 data_loader = random_dataloader(model=model, total_samples=run_steps, @@ -261,6 +289,8 @@ def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, l model, _, _, _ = deepspeed.initialize(config=config_dict, model=models[1], model_parameters=models[1].parameters()) + if compile_mode: + model.compile() model.load_checkpoint(tmpdir, load_optimizer_states=load_optim) if load_optim: @@ -275,7 +305,7 @@ def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, l model.step() def test_elastic_checkpoint_change_dp(self, ws4_model_checkpoint, class_tmpdir, elastic_save, elastic_load, - load_optim): + load_optim, compile_mode): config_dict = { "train_batch_size": 4, "optimizer": { @@ -295,6 +325,8 @@ def test_elastic_checkpoint_change_dp(self, ws4_model_checkpoint, class_tmpdir, # Load checkpoint with dp world size = 2 model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + if compile_mode: + model.compile() if load_optim: with pytest.raises(deepspeed.runtime.zero.utils.ZeRORuntimeException): model.load_checkpoint(class_tmpdir, load_optimizer_states=load_optim) @@ -302,11 +334,12 @@ def test_elastic_checkpoint_change_dp(self, ws4_model_checkpoint, class_tmpdir, model.load_checkpoint(class_tmpdir, load_optimizer_states=load_optim) +@pytest.mark.parametrize('compile_mode', [True, False]) class TestZeROSaveLoadEdgeCase(DistributedTest): world_size = 2 @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) - def test_immediate_save_load(self, tmpdir, zero_stage): + def test_immediate_save_load(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 4, "optimizer": { @@ -324,6 +357,8 @@ def test_immediate_save_load(self, tmpdir, zero_stage): model = SimpleModel(hidden_dim) ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None) + if compile_mode: + ds_model.compile() ds_model.save_checkpoint(tmpdir) ds_model.load_checkpoint(tmpdir, load_optimizer_states=False, @@ -331,7 +366,7 @@ def test_immediate_save_load(self, tmpdir, zero_stage): load_module_only=False) @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) - def test_load_immediate_save(self, tmpdir, zero_stage): + def test_load_immediate_save(self, tmpdir, zero_stage, compile_mode): if zero_stage == 0 and get_accelerator().device_name() == "cpu": pytest.skip("CPU Accelerator does not support this test") config_dict = { @@ -352,6 +387,8 @@ def test_load_immediate_save(self, tmpdir, zero_stage): # 1. pretrain a model and save it ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None) + if compile_mode: + ds_model.compile() data_loader = random_dataloader(model=ds_model, total_samples=1, hidden_dim=hidden_dim, device=ds_model.device) for _, batch in enumerate(data_loader): loss = ds_model(batch[0], batch[1]) @@ -363,6 +400,8 @@ def test_load_immediate_save(self, tmpdir, zero_stage): # 2. load and immediately save a model with a fresh ds engine ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None) + if compile_mode: + ds_model.compile() ds_model.load_checkpoint(tmpdir, load_optimizer_states=False, load_lr_scheduler_states=False, @@ -370,7 +409,7 @@ def test_load_immediate_save(self, tmpdir, zero_stage): ds_model.save_checkpoint(tmpdir) @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) - def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage): + def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage, compile_mode): config_dict = { "optimizer": { "type": 'Adam' @@ -395,6 +434,8 @@ def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage): # So we config grad_accum=2 and step only once and save_16bit_model ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None) + if compile_mode: + ds_model.compile() data_loader = random_dataloader(model=ds_model, total_samples=2, hidden_dim=hidden_dim, device=ds_model.device) batch = next(iter(data_loader)) @@ -411,11 +452,12 @@ def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage): ds_model.save_checkpoint(tmpdir) +@pytest.mark.parametrize('compile_mode', [True, False]) class TestZeROCheckpointFrozenWeights(DistributedTest): world_size = 2 @pytest.mark.parametrize('zero_stage', [1, 2, 3]) - def test_load_optimizer_state(self, tmpdir, zero_stage): + def test_load_optimizer_state(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 2, @@ -443,10 +485,15 @@ def test_load_optimizer_state(self, tmpdir, zero_stage): with deepspeed.zero.Init(enabled=zero_stage == 3, config_dict_or_path=config_dict): models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=True) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_optimizer_states=True, + compile_mode=compile_mode) @pytest.mark.parametrize('zero_stage', [1, 2, 3]) - def test_not_load_optimizer_state(self, tmpdir, zero_stage): + def test_not_load_optimizer_state(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 2, @@ -473,10 +520,15 @@ def test_not_load_optimizer_state(self, tmpdir, zero_stage): with deepspeed.zero.Init(enabled=zero_stage == 3, config_dict_or_path=config_dict): models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=False) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_optimizer_states=False, + compile_mode=compile_mode) @pytest.mark.parametrize('zero_stage', [1, 2, 3]) - def test_load_module_only(self, tmpdir, zero_stage): + def test_load_module_only(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 2, "optimizer": { @@ -495,10 +547,15 @@ def test_load_module_only(self, tmpdir, zero_stage): with deepspeed.zero.Init(enabled=zero_stage == 3, config_dict_or_path=config_dict): models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_module_only=True, + compile_mode=compile_mode) @pytest.mark.parametrize('zero_stage', [1, 2]) - def test_save_exclude_frozen_weights(self, tmpdir, zero_stage): + def test_save_exclude_frozen_weights(self, tmpdir, zero_stage, compile_mode): world_size = 1 config_dict = { "train_micro_batch_size_per_gpu": 1, @@ -518,6 +575,8 @@ def test_save_exclude_frozen_weights(self, tmpdir, zero_stage): model = SimpleFrozenModel(hidden_dim, empty_grad=False) ds_engine, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config_dict) + if compile_mode: + ds_engine.compile() # Validate backwards-compatibility of including frozen parameters in checkpoint all_ckpt_folder = os.path.join(tmpdir, 'all_params') @@ -546,7 +605,7 @@ def test_save_exclude_frozen_weights(self, tmpdir, zero_stage): assert loaded_trainable_param_names == trainable_param_names @pytest.mark.parametrize('zero_stage', [1, 2]) - def test_save_exclude_custom_frozen_weights(self, tmpdir, zero_stage): + def test_save_exclude_custom_frozen_weights(self, tmpdir, zero_stage, compile_mode): world_size = 1 config_dict = { "train_micro_batch_size_per_gpu": 1, @@ -566,6 +625,8 @@ def test_save_exclude_custom_frozen_weights(self, tmpdir, zero_stage): model = SimpleFrozenModel(hidden_dim, empty_grad=False) ds_engine, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config_dict) + if compile_mode: + ds_engine.compile() # Validate custom state_dict model state_dict_bk = model.state_dict @@ -590,9 +651,10 @@ def test_save_exclude_custom_frozen_weights(self, tmpdir, zero_stage): class TestSaveTensorClone(DistributedTest): world_size = 1 + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [1, 2]) @pytest.mark.parametrize('use_cpu_device', [True, False]) - def test_save_tensor_clone(self, tmpdir, zero_stage, use_cpu_device): + def test_save_tensor_clone(self, tmpdir, zero_stage, use_cpu_device, compile_mode): config_dict = { "optimizer": { @@ -609,6 +671,8 @@ def test_save_tensor_clone(self, tmpdir, zero_stage, use_cpu_device): ref_model_state_dict = model.state_dict() ds_engine, _, _, _ = deepspeed.initialize(model=model, config_params=config_dict) + if compile_mode: + ds_engine.compile() clone_device = torch.device('cpu') if use_cpu_device else get_accelerator().current_device() clone_state_dict = clone_tensors_for_torch_save(ds_engine.module.state_dict()) compare_state_dicts(ref_model_state_dict, clone_state_dict) @@ -626,8 +690,9 @@ class TestZeRONonDistributed(DistributedTest): world_size = 1 init_distributed = False + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [1, 2, 3]) - def test_chmod_exception_handling(self, monkeypatch, zero_stage): + def test_chmod_exception_handling(self, monkeypatch, zero_stage, compile_mode): config_dict = { "optimizer": { @@ -645,6 +710,8 @@ def test_chmod_exception_handling(self, monkeypatch, zero_stage): model=net, model_parameters=net.parameters()) + if compile_mode: + engine.compile() log_called = False def mock_logger_info(message, *args, **kwargs): diff --git a/tests/unit/inference/test_checkpoint_sharding.py b/tests/unit/inference/test_checkpoint_sharding.py index f1e37ee26536..1466be17e134 100644 --- a/tests/unit/inference/test_checkpoint_sharding.py +++ b/tests/unit/inference/test_checkpoint_sharding.py @@ -76,7 +76,9 @@ def run(self, model_name, class_tmpdir): class TestCheckpointShard(DistributedTest): world_size = 2 - def test(self, model_name, dtype, class_tmpdir, save_shard): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test(self, model_name, dtype, class_tmpdir, save_shard, compile_mode): + world_size = int(os.getenv("WORLD_SIZE", "1")) inf_config = { "replace_with_kernel_inject": True, @@ -95,6 +97,8 @@ def test(self, model_name, dtype, class_tmpdir, save_shard): model = AutoModelForCausalLM.from_config(model_config, torch_dtype=torch.bfloat16) model = model.eval() model = deepspeed.init_inference(model, config=inf_config) + if compile_mode: + model.compile() check_dtype(model, dtype) @@ -102,7 +106,8 @@ def test(self, model_name, dtype, class_tmpdir, save_shard): class TestCheckpointShardinAutoTP(DistributedTest): world_size = 2 - def test(self, model_name, class_tmpdir): + @pytest.mark.parametrize('compile_mode', [True, False]) + def test(self, model_name, class_tmpdir, compile_mode): def write_checkpoints_json(model_name, class_tmpdir): import json @@ -140,3 +145,5 @@ def write_checkpoints_json(model_name, class_tmpdir): model = AutoModelForCausalLM.from_config(model_config, torch_dtype=torch.bfloat16) model = model.eval() model = deepspeed.init_inference(model, config=inf_config) + if compile_mode: + model.compile() diff --git a/tests/unit/inference/test_inference.py b/tests/unit/inference/test_inference.py index 9337eb67ff1e..c7c5f0743a57 100644 --- a/tests/unit/inference/test_inference.py +++ b/tests/unit/inference/test_inference.py @@ -348,6 +348,7 @@ def validate_test(model_w_task, dtype, enable_cuda_graph, enable_triton): return msg +@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.inference class TestModelTask(DistributedTest): world_size = 1 @@ -361,6 +362,7 @@ def test( query, inf_kwargs, assert_fn, + compile_mode, perf_meas=True, ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph, enable_triton) @@ -407,6 +409,8 @@ def test( ).max_out_tokens: args.update({'max_out_tokens': pipe.tokenizer.model_max_length}) pipe.model = deepspeed.init_inference(pipe.model, **args) + if compile_mode: + pipe.model.compile() check_injection(pipe.model) # Warm-up queries for perf measurement #for i in range(10): @@ -434,6 +438,7 @@ def test( assert assert_fn(bs_output, ds_output) +@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.seq_inference @pytest.mark.parametrize("model_w_task", [("EleutherAI/gpt-neo-1.3B", "text-generation"), ("EleutherAI/gpt-neox-20b", "text-generation"), @@ -450,6 +455,7 @@ def test( query, inf_kwargs, assert_fn, + compile_mode, ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False) if invalid_test_msg: @@ -470,6 +476,8 @@ def test( mp_size=self.world_size, dtype=dtype, replace_with_kernel_inject=True) + if compile_mode: + pipe.model.compile() check_injection(pipe.model) # Switch device to GPU so that input tensors are not on CPU pipe.device = torch.device(get_accelerator().device_name(local_rank)) @@ -480,6 +488,7 @@ def test( assert assert_fn(bs_output, ds_output) +@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.inference @pytest.mark.parametrize("model_w_task", [("openai-community/gpt2", "text-generation")], ids=["gpt2"]) class TestLowCpuMemUsage(DistributedTest): @@ -491,6 +500,7 @@ def test( query, inf_kwargs, assert_fn, + compile_mode, ): model, task = model_w_task dtype = torch.float16 @@ -506,12 +516,14 @@ def test( dtype=dtype, replace_method="auto", replace_with_kernel_inject=True) - + if compile_mode: + pipe.model.compile() ds_output = pipe(query, **inf_kwargs) assert assert_fn(bs_output, ds_output) +@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.seq_inference @pytest.mark.parametrize( "model_w_task, injection_policy", @@ -528,7 +540,17 @@ def test( @pytest.mark.parametrize("dtype", [torch.float], ids=["fp32"]) class TestInjectionPolicy(DistributedTest): - def test(self, model_w_task, injection_policy, query, inf_kwargs, assert_fn, dtype, world_size): + def test( + self, + model_w_task, + injection_policy, + query, + inf_kwargs, + assert_fn, + dtype, + world_size, + compile_mode, + ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False) if invalid_test_msg: pytest.skip(invalid_test_msg) @@ -546,6 +568,8 @@ def test(self, model_w_task, injection_policy, query, inf_kwargs, assert_fn, dty mp_size=world_size, dtype=dtype, injection_policy=injection_policy) + if compile_mode: + pipe.model.compile() ds_output = pipe(query, **inf_kwargs) print(local_rank, "baseline", bs_output) @@ -608,6 +632,7 @@ def test(self, model_w_task, dtype, query, inf_kwargs, assert_fn): # assert assert_fn(bs_output, ds_output) +@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.seq_inference @pytest.mark.parametrize('keep_module_on_host', [True, False]) @pytest.mark.parametrize( @@ -626,6 +651,7 @@ def test( inf_kwargs, assert_fn, dtype, + compile_mode, keep_module_on_host, ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False) @@ -653,6 +679,8 @@ def test( mp_size=world_size, dtype=dtype, keep_module_on_host=keep_module_on_host) + if compile_mode: + pipe.model.compile() ds_output = pipe(query, **inf_kwargs) print(local_rank, "baseline", bs_output) @@ -671,6 +699,7 @@ def test_odd_world_size( inf_kwargs, assert_fn, dtype, + compile_mode, keep_module_on_host, ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False) @@ -693,6 +722,8 @@ def test_odd_world_size( mp_size=world_size, dtype=dtype, keep_module_on_host=keep_module_on_host) + if compile_mode: + pipe.model.compile() ds_output = pipe(query, **inf_kwargs) print(local_rank, "baseline", bs_output) @@ -704,6 +735,7 @@ def test_odd_world_size( assert param.device == torch.device('cpu'), f"keep_module_on_host is on but param {name} is not on cpu" +@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.nightly @pytest.mark.parametrize( "model_family, model_name", @@ -718,7 +750,7 @@ class TestLMCorrectness(DistributedTest): world_size = 1 exec_timeout = 1200 # Give these tests longer to complete - def test(self, model_family, model_name, task): + def test(self, model_family, model_name, task, compile_mode): # imports here to avoid import errors when pytest collects tests import lm_eval import lm_eval.models @@ -776,6 +808,8 @@ def no_pool_bootstrap_stderr(f, xs, iters): replace_with_kernel_inject=True, enable_cuda_graph=False, ) + if compile_mode: + ds_model.compile() check_injection(ds_model) setattr(lm, model_family, ds_model) get_accelerator().synchronize() From 6b7b8871d43c5151a0827653ce0d4439cba1a229 Mon Sep 17 00:00:00 2001 From: Nir Sonnenschein Date: Sun, 2 Nov 2025 15:04:01 +0200 Subject: [PATCH 2/2] allow disabling torch compile tests make torch compile test scenarios switchable using a pytest flag. if the flag --enable-compile-mode is not used the additional tests will not run (saving test cyctle time) Signed-off-by: Nir Sonnenschein --- tests/conftest.py | 8 ++++++++ tests/unit/checkpoint/test_latest_checkpoint.py | 2 -- tests/unit/checkpoint/test_lr_scheduler.py | 1 - tests/unit/checkpoint/test_moe_checkpoint.py | 1 - tests/unit/checkpoint/test_other_optimizer.py | 3 --- tests/unit/checkpoint/test_pipeline.py | 1 - tests/unit/checkpoint/test_shared_weights.py | 2 -- tests/unit/checkpoint/test_sparse.py | 1 - tests/unit/checkpoint/test_tag_validation.py | 2 -- tests/unit/checkpoint/test_universal_checkpoint.py | 1 - tests/unit/checkpoint/test_zero_optimizer.py | 5 ----- tests/unit/inference/test_checkpoint_sharding.py | 2 -- tests/unit/inference/test_inference.py | 6 ------ 13 files changed, 8 insertions(+), 27 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 8137dfb74042..604173404c21 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,6 +31,7 @@ def pytest_configure(config): def pytest_addoption(parser): parser.addoption("--torch_ver", default=None, type=str) parser.addoption("--cuda_ver", default=None, type=str) + parser.addoption("--enable-compile-mode", action="store_true", help="Run both compiled/non-compiled versions") def validate_version(expected, found): @@ -70,6 +71,13 @@ def pytest_runtest_call(item): item.runtest = lambda: True # Dummy function so test is not run twice +def pytest_generate_tests(metafunc): + if "compile_mode" in metafunc.fixturenames: + compile_testing_enabled = metafunc.config.getoption("--enable-compile-mode") + params = [False, True] if compile_testing_enabled else [False] + metafunc.parametrize("compile_mode", params) + + # We allow DistributedTest to reuse distributed environments. When the last # test for a class is run, we want to make sure those distributed environments # are destroyed. diff --git a/tests/unit/checkpoint/test_latest_checkpoint.py b/tests/unit/checkpoint/test_latest_checkpoint.py index cf9d6976d712..372e1b3bf247 100644 --- a/tests/unit/checkpoint/test_latest_checkpoint.py +++ b/tests/unit/checkpoint/test_latest_checkpoint.py @@ -19,7 +19,6 @@ class TestLatestCheckpoint(DistributedTest): world_size = 1 - @pytest.mark.parametrize('compile_mode', [True, False]) def test_existing_latest(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, @@ -43,7 +42,6 @@ def test_existing_latest(self, tmpdir, compile_mode): dtype=torch.float, compile_mode=compile_mode) - @pytest.mark.parametrize('compile_mode', [True, False]) def test_missing_latest(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, diff --git a/tests/unit/checkpoint/test_lr_scheduler.py b/tests/unit/checkpoint/test_lr_scheduler.py index 9bd9b5fd5815..5521abaedc49 100644 --- a/tests/unit/checkpoint/test_lr_scheduler.py +++ b/tests/unit/checkpoint/test_lr_scheduler.py @@ -15,7 +15,6 @@ import pytest -@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage, use_cpu_offload', [(0, False), (1, False), (2, False), (2, True), (3, False), (3, True)]) class TestLRSchedulerCheckpoint(DistributedTest): diff --git a/tests/unit/checkpoint/test_moe_checkpoint.py b/tests/unit/checkpoint/test_moe_checkpoint.py index ed24f2818906..b128ebcb20e2 100644 --- a/tests/unit/checkpoint/test_moe_checkpoint.py +++ b/tests/unit/checkpoint/test_moe_checkpoint.py @@ -38,7 +38,6 @@ def test_checkpoint_moe(self, tmpdir, ep_size): seq_dataloader=True, dtype=torch.float16) - @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize("ep_size, load_optim_states", [(4, True), (4, False), (2, True), (2, False)]) def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states, compile_mode): if not required_torch_version(min_version=1.8): diff --git a/tests/unit/checkpoint/test_other_optimizer.py b/tests/unit/checkpoint/test_other_optimizer.py index a04fbb05cdf2..7e301492af25 100644 --- a/tests/unit/checkpoint/test_other_optimizer.py +++ b/tests/unit/checkpoint/test_other_optimizer.py @@ -18,7 +18,6 @@ class TestOtherOptimizerCheckpoint(DistributedTest): world_size = 2 @pytest.mark.skipif(not deepspeed.ops.__compatible_ops__[FusedLambBuilder.NAME], reason="lamb is not compatible") - @pytest.mark.parametrize('compile_mode', [True, False]) def test_checkpoint_unfused_optimizer(self, tmpdir, compile_mode): #if not get_accelerator().is_fp16_supported(): # pytest.skip("fp16 is not supported") @@ -80,7 +79,6 @@ def test_checkpoint_unfused_optimizer(self, tmpdir, compile_mode): dtype=dtype, compile_mode=compile_mode) - @pytest.mark.parametrize('compile_mode', [True, False]) def test_checkpoint_fused_optimizer(self, tmpdir, compile_mode): if get_accelerator().device_name() == "cpu": pytest.skip("CPU accelerator does not support this test") @@ -124,7 +122,6 @@ def test_checkpoint_fused_optimizer(self, tmpdir, compile_mode): dtype=dtype, compile_mode=compile_mode) - @pytest.mark.parametrize('compile_mode', [True, False]) def test_checkpoint_fp32_optimizer(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, diff --git a/tests/unit/checkpoint/test_pipeline.py b/tests/unit/checkpoint/test_pipeline.py index c90f5dbe1cf6..68065b730ef1 100644 --- a/tests/unit/checkpoint/test_pipeline.py +++ b/tests/unit/checkpoint/test_pipeline.py @@ -15,7 +15,6 @@ class TestPipelineCheckpoint(DistributedTest): world_size = 4 - @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize("zero_stage", [0, 1]) def test_checkpoint_pipe_engine(self, zero_stage, tmpdir, compile_mode): skip_on_arch(min_arch=7) diff --git a/tests/unit/checkpoint/test_shared_weights.py b/tests/unit/checkpoint/test_shared_weights.py index bd2f1061e601..084a1908201b 100644 --- a/tests/unit/checkpoint/test_shared_weights.py +++ b/tests/unit/checkpoint/test_shared_weights.py @@ -7,7 +7,6 @@ import torch.nn as nn import deepspeed -import pytest from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint from unit.common import DistributedTest @@ -26,7 +25,6 @@ def __init__(self): class TestCheckpointSharedWeights(DistributedTest): world_size = 2 - @pytest.mark.parametrize('compile_mode', [True, False]) def test_checkpoint_shared_weights(self, tmp_path, compile_mode): config = { "train_micro_batch_size_per_gpu": 2, diff --git a/tests/unit/checkpoint/test_sparse.py b/tests/unit/checkpoint/test_sparse.py index e2f0e1dc079b..7def94cf2764 100644 --- a/tests/unit/checkpoint/test_sparse.py +++ b/tests/unit/checkpoint/test_sparse.py @@ -24,7 +24,6 @@ class TestSparseCheckpoint(DistributedTest): [True, False], [True, True], ]) - @pytest.mark.parametrize('compile_mode', [True, False]) def test_non_strict_load_sparse(self, tmpdir, to_save_model_has_embedding, to_save_model_sparse, destination_has_embedding, destination_sparse, compile_mode): diff --git a/tests/unit/checkpoint/test_tag_validation.py b/tests/unit/checkpoint/test_tag_validation.py index edbc42dcadf4..b5aff05d9cb6 100644 --- a/tests/unit/checkpoint/test_tag_validation.py +++ b/tests/unit/checkpoint/test_tag_validation.py @@ -14,7 +14,6 @@ class TestCheckpointValidationTag(DistributedTest): world_size = 2 - @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('valid_mode', ["FAIL", "WARN", "IGNORE"]) def test_checkpoint_unique_tag(self, tmpdir, valid_mode, compile_mode): config_dict = { @@ -42,7 +41,6 @@ def test_checkpoint_unique_tag(self, tmpdir, valid_mode, compile_mode): else: model.save_checkpoint(save_dir=tmpdir, tag=f"tag-{dist.get_rank()}") - @pytest.mark.parametrize('compile_mode', [True, False]) def test_checkpoint_unknown_tag_validation(self, tmpdir, compile_mode): config_dict = { diff --git a/tests/unit/checkpoint/test_universal_checkpoint.py b/tests/unit/checkpoint/test_universal_checkpoint.py index aa10d27d4804..e5997f140bc9 100644 --- a/tests/unit/checkpoint/test_universal_checkpoint.py +++ b/tests/unit/checkpoint/test_universal_checkpoint.py @@ -211,7 +211,6 @@ def update_gathered_stage3_optimizer(optimizer_state, param_shapes, world_size): @pytest.mark.parametrize("use_torch_adam", [False, True]) @pytest.mark.parametrize("load_optim", [False, True]) @pytest.mark.parametrize("sub_group_size", [-1, 100]) -@pytest.mark.parametrize('compile_mode', [True, False]) class TestZeROUniversalCheckpointDP(DistributedTest): def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_size, compile_mode): diff --git a/tests/unit/checkpoint/test_zero_optimizer.py b/tests/unit/checkpoint/test_zero_optimizer.py index 8a4a15360644..96646c4388e0 100644 --- a/tests/unit/checkpoint/test_zero_optimizer.py +++ b/tests/unit/checkpoint/test_zero_optimizer.py @@ -21,7 +21,6 @@ class TestZeROCheckpoint(DistributedTest): world_size = 2 - @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [3]) def test_pipeline_checkpoint_loading(self, tmpdir, zero_stage, compile_mode): config_dict = { @@ -170,7 +169,6 @@ def test_hybrid_optimizer_state(self, tmpdir, zero_stage, compile_mode): load_optimizer_states=True, compile_mode=compile_mode) - @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) def test_load_module_only(self, tmpdir, zero_stage, compile_mode): if zero_stage == 0 and get_accelerator().device_name() == "cpu": @@ -237,7 +235,6 @@ def run(self, class_tmpdir, elastic_save, load_optim): model.save_checkpoint(class_tmpdir) -@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize("elastic_save", [True, False]) @pytest.mark.parametrize("elastic_load", [True, False]) @pytest.mark.parametrize("load_optim", [True, False]) @@ -334,7 +331,6 @@ def test_elastic_checkpoint_change_dp(self, ws4_model_checkpoint, class_tmpdir, model.load_checkpoint(class_tmpdir, load_optimizer_states=load_optim) -@pytest.mark.parametrize('compile_mode', [True, False]) class TestZeROSaveLoadEdgeCase(DistributedTest): world_size = 2 @@ -651,7 +647,6 @@ def test_save_exclude_custom_frozen_weights(self, tmpdir, zero_stage, compile_mo class TestSaveTensorClone(DistributedTest): world_size = 1 - @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [1, 2]) @pytest.mark.parametrize('use_cpu_device', [True, False]) def test_save_tensor_clone(self, tmpdir, zero_stage, use_cpu_device, compile_mode): diff --git a/tests/unit/inference/test_checkpoint_sharding.py b/tests/unit/inference/test_checkpoint_sharding.py index 1466be17e134..9c99892e93b6 100644 --- a/tests/unit/inference/test_checkpoint_sharding.py +++ b/tests/unit/inference/test_checkpoint_sharding.py @@ -76,7 +76,6 @@ def run(self, model_name, class_tmpdir): class TestCheckpointShard(DistributedTest): world_size = 2 - @pytest.mark.parametrize('compile_mode', [True, False]) def test(self, model_name, dtype, class_tmpdir, save_shard, compile_mode): world_size = int(os.getenv("WORLD_SIZE", "1")) @@ -106,7 +105,6 @@ def test(self, model_name, dtype, class_tmpdir, save_shard, compile_mode): class TestCheckpointShardinAutoTP(DistributedTest): world_size = 2 - @pytest.mark.parametrize('compile_mode', [True, False]) def test(self, model_name, class_tmpdir, compile_mode): def write_checkpoints_json(model_name, class_tmpdir): diff --git a/tests/unit/inference/test_inference.py b/tests/unit/inference/test_inference.py index c7c5f0743a57..931b574601bb 100644 --- a/tests/unit/inference/test_inference.py +++ b/tests/unit/inference/test_inference.py @@ -348,7 +348,6 @@ def validate_test(model_w_task, dtype, enable_cuda_graph, enable_triton): return msg -@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.inference class TestModelTask(DistributedTest): world_size = 1 @@ -438,7 +437,6 @@ def test( assert assert_fn(bs_output, ds_output) -@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.seq_inference @pytest.mark.parametrize("model_w_task", [("EleutherAI/gpt-neo-1.3B", "text-generation"), ("EleutherAI/gpt-neox-20b", "text-generation"), @@ -488,7 +486,6 @@ def test( assert assert_fn(bs_output, ds_output) -@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.inference @pytest.mark.parametrize("model_w_task", [("openai-community/gpt2", "text-generation")], ids=["gpt2"]) class TestLowCpuMemUsage(DistributedTest): @@ -523,7 +520,6 @@ def test( assert assert_fn(bs_output, ds_output) -@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.seq_inference @pytest.mark.parametrize( "model_w_task, injection_policy", @@ -632,7 +628,6 @@ def test(self, model_w_task, dtype, query, inf_kwargs, assert_fn): # assert assert_fn(bs_output, ds_output) -@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.seq_inference @pytest.mark.parametrize('keep_module_on_host', [True, False]) @pytest.mark.parametrize( @@ -735,7 +730,6 @@ def test_odd_world_size( assert param.device == torch.device('cpu'), f"keep_module_on_host is on but param {name} is not on cpu" -@pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.nightly @pytest.mark.parametrize( "model_family, model_name",