diff --git a/tests/conftest.py b/tests/conftest.py index 8137dfb74042..604173404c21 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,6 +31,7 @@ def pytest_configure(config): def pytest_addoption(parser): parser.addoption("--torch_ver", default=None, type=str) parser.addoption("--cuda_ver", default=None, type=str) + parser.addoption("--enable-compile-mode", action="store_true", help="Run both compiled/non-compiled versions") def validate_version(expected, found): @@ -70,6 +71,13 @@ def pytest_runtest_call(item): item.runtest = lambda: True # Dummy function so test is not run twice +def pytest_generate_tests(metafunc): + if "compile_mode" in metafunc.fixturenames: + compile_testing_enabled = metafunc.config.getoption("--enable-compile-mode") + params = [False, True] if compile_testing_enabled else [False] + metafunc.parametrize("compile_mode", params) + + # We allow DistributedTest to reuse distributed environments. When the last # test for a class is run, we want to make sure those distributed environments # are destroyed. diff --git a/tests/unit/checkpoint/common.py b/tests/unit/checkpoint/common.py index 0daa1b070850..6a1d5d72267c 100644 --- a/tests/unit/checkpoint/common.py +++ b/tests/unit/checkpoint/common.py @@ -174,11 +174,14 @@ def checkpoint_correctness_verification(config_dict, empty_tag=False, seq_dataloader=False, load_module_only=False, - dtype=None): + dtype=None, + compile_mode=False): if dtype is None: dtype = preferred_dtype() ds_model = create_deepspeed_model(config_dict=config_dict, model=models[0], base_optimizer=base_optimizers[0]) + if compile_mode: + ds_model.compile() if seq_dataloader: data_loader = sequence_dataloader(model=ds_model, diff --git a/tests/unit/checkpoint/test_latest_checkpoint.py b/tests/unit/checkpoint/test_latest_checkpoint.py index 5d795c4dadcf..372e1b3bf247 100644 --- a/tests/unit/checkpoint/test_latest_checkpoint.py +++ b/tests/unit/checkpoint/test_latest_checkpoint.py @@ -19,7 +19,7 @@ class TestLatestCheckpoint(DistributedTest): world_size = 1 - def test_existing_latest(self, tmpdir): + def test_existing_latest(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, "steps_per_print": 1, @@ -39,9 +39,10 @@ def test_existing_latest(self, tmpdir): load_optimizer_states=True, load_lr_scheduler_states=False, empty_tag=True, - dtype=torch.float) + dtype=torch.float, + compile_mode=compile_mode) - def test_missing_latest(self, tmpdir): + def test_missing_latest(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, "steps_per_print": 1, @@ -55,5 +56,7 @@ def test_missing_latest(self, tmpdir): hidden_dim = 10 model = SimpleModel(hidden_dim) model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + if compile_mode: + model.compile() # should be no-op, since latest doesn't exist model.load_checkpoint(tmpdir) diff --git a/tests/unit/checkpoint/test_lr_scheduler.py b/tests/unit/checkpoint/test_lr_scheduler.py index 6dd7e3279521..5521abaedc49 100644 --- a/tests/unit/checkpoint/test_lr_scheduler.py +++ b/tests/unit/checkpoint/test_lr_scheduler.py @@ -20,7 +20,7 @@ class TestLRSchedulerCheckpoint(DistributedTest): world_size = 2 - def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload): + def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload, compile_mode): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") if get_accelerator().device_name() == 'cpu': @@ -70,9 +70,10 @@ def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload): hidden_dim, tmpdir, load_optimizer_states=False, - load_lr_scheduler_states=True) + load_lr_scheduler_states=True, + compile_mode=compile_mode) - def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload): + def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload, compile_mode): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") if get_accelerator().device_name() == 'cpu': @@ -117,4 +118,5 @@ def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload): hidden_dim, tmpdir, load_optimizer_states=False, - load_lr_scheduler_states=False) + load_lr_scheduler_states=False, + compile_mode=compile_mode) diff --git a/tests/unit/checkpoint/test_moe_checkpoint.py b/tests/unit/checkpoint/test_moe_checkpoint.py index 89878b5d8fa9..b128ebcb20e2 100644 --- a/tests/unit/checkpoint/test_moe_checkpoint.py +++ b/tests/unit/checkpoint/test_moe_checkpoint.py @@ -39,7 +39,7 @@ def test_checkpoint_moe(self, tmpdir, ep_size): dtype=torch.float16) @pytest.mark.parametrize("ep_size, load_optim_states", [(4, True), (4, False), (2, True), (2, False)]) - def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states): + def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states, compile_mode): if not required_torch_version(min_version=1.8): pytest.skip("DeepSpeed MoE tests need torch 1.8 or higher to run correctly") @@ -80,4 +80,5 @@ def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states): empty_tag=True, base_optimizers=optimizers, seq_dataloader=True, - dtype=torch.float16) + dtype=torch.float16, + compile_mode=compile_mode) diff --git a/tests/unit/checkpoint/test_other_optimizer.py b/tests/unit/checkpoint/test_other_optimizer.py index 9d623260f1dd..7e301492af25 100644 --- a/tests/unit/checkpoint/test_other_optimizer.py +++ b/tests/unit/checkpoint/test_other_optimizer.py @@ -18,7 +18,7 @@ class TestOtherOptimizerCheckpoint(DistributedTest): world_size = 2 @pytest.mark.skipif(not deepspeed.ops.__compatible_ops__[FusedLambBuilder.NAME], reason="lamb is not compatible") - def test_checkpoint_unfused_optimizer(self, tmpdir): + def test_checkpoint_unfused_optimizer(self, tmpdir, compile_mode): #if not get_accelerator().is_fp16_supported(): # pytest.skip("fp16 is not supported") config_dict = { @@ -67,7 +67,8 @@ def test_checkpoint_unfused_optimizer(self, tmpdir): hidden_dim=hidden_dim, tmpdir=tmpdir, load_optimizer_states=True, - dtype=dtype) + dtype=dtype, + compile_mode=compile_mode) # Ignore optimizer states checkpoint_correctness_verification(config_dict, @@ -75,9 +76,10 @@ def test_checkpoint_unfused_optimizer(self, tmpdir): hidden_dim=hidden_dim, tmpdir=tmpdir, load_optimizer_states=False, - dtype=dtype) + dtype=dtype, + compile_mode=compile_mode) - def test_checkpoint_fused_optimizer(self, tmpdir): + def test_checkpoint_fused_optimizer(self, tmpdir, compile_mode): if get_accelerator().device_name() == "cpu": pytest.skip("CPU accelerator does not support this test") config_dict = { @@ -108,7 +110,8 @@ def test_checkpoint_fused_optimizer(self, tmpdir): hidden_dim=hidden_dim, tmpdir=tmpdir, load_optimizer_states=True, - dtype=dtype) + dtype=dtype, + compile_mode=compile_mode) # Ignore optimizer states checkpoint_correctness_verification(config_dict, @@ -116,9 +119,10 @@ def test_checkpoint_fused_optimizer(self, tmpdir): hidden_dim=hidden_dim, tmpdir=tmpdir, load_optimizer_states=False, - dtype=dtype) + dtype=dtype, + compile_mode=compile_mode) - def test_checkpoint_fp32_optimizer(self, tmpdir): + def test_checkpoint_fp32_optimizer(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, "steps_per_print": 1, @@ -143,4 +147,5 @@ def test_checkpoint_fp32_optimizer(self, tmpdir): models=models, hidden_dim=hidden_dim, tmpdir=tmpdir, - dtype=torch.float32) + dtype=torch.float32, + compile_mode=compile_mode) diff --git a/tests/unit/checkpoint/test_pipeline.py b/tests/unit/checkpoint/test_pipeline.py index c6c228ccada7..68065b730ef1 100644 --- a/tests/unit/checkpoint/test_pipeline.py +++ b/tests/unit/checkpoint/test_pipeline.py @@ -16,7 +16,7 @@ class TestPipelineCheckpoint(DistributedTest): world_size = 4 @pytest.mark.parametrize("zero_stage", [0, 1]) - def test_checkpoint_pipe_engine(self, zero_stage, tmpdir): + def test_checkpoint_pipe_engine(self, zero_stage, tmpdir, compile_mode): skip_on_arch(min_arch=7) config_dict = { @@ -61,7 +61,8 @@ def test_checkpoint_pipe_engine(self, zero_stage, tmpdir): load_optimizer_states=True, load_lr_scheduler_states=True, train_batch=True, - dtype=torch.float16 if zero_stage > 0 else torch.float32) + dtype=torch.float16 if zero_stage > 0 else torch.float32, + compile_mode=compile_mode) @pytest.mark.parametrize( "base_topo,test_topo", diff --git a/tests/unit/checkpoint/test_shared_weights.py b/tests/unit/checkpoint/test_shared_weights.py index ed69073fb81c..084a1908201b 100644 --- a/tests/unit/checkpoint/test_shared_weights.py +++ b/tests/unit/checkpoint/test_shared_weights.py @@ -25,7 +25,7 @@ def __init__(self): class TestCheckpointSharedWeights(DistributedTest): world_size = 2 - def test_checkpoint_shared_weights(self, tmp_path): + def test_checkpoint_shared_weights(self, tmp_path, compile_mode): config = { "train_micro_batch_size_per_gpu": 2, "zero_allow_untested_optimizer": True, @@ -41,6 +41,9 @@ def test_checkpoint_shared_weights(self, tmp_path): model=model, optimizer=optimizer, ) + if compile_mode: + deepspeed_engine.compile() + filename = tmp_path / "checkpoint.pt" deepspeed_engine.save_checkpoint(filename, tag="checkpoint") diff --git a/tests/unit/checkpoint/test_sparse.py b/tests/unit/checkpoint/test_sparse.py index 19fbcd81e473..7def94cf2764 100644 --- a/tests/unit/checkpoint/test_sparse.py +++ b/tests/unit/checkpoint/test_sparse.py @@ -25,7 +25,7 @@ class TestSparseCheckpoint(DistributedTest): [True, True], ]) def test_non_strict_load_sparse(self, tmpdir, to_save_model_has_embedding, to_save_model_sparse, - destination_has_embedding, destination_sparse): + destination_has_embedding, destination_sparse, compile_mode): class ModelNoEmbedding(torch.nn.Module): @@ -66,6 +66,10 @@ def forward(self, x, offsets): "sparse_gradients": destination_sparse }) + if compile_mode: + engine_to_save.compile() + engine_destination.compile() + save_folder = os.path.join(tmpdir, 'saved_checkpoint') save_tag = '1' diff --git a/tests/unit/checkpoint/test_tag_validation.py b/tests/unit/checkpoint/test_tag_validation.py index b164c31e52b0..b5aff05d9cb6 100644 --- a/tests/unit/checkpoint/test_tag_validation.py +++ b/tests/unit/checkpoint/test_tag_validation.py @@ -15,7 +15,7 @@ class TestCheckpointValidationTag(DistributedTest): world_size = 2 @pytest.mark.parametrize('valid_mode', ["FAIL", "WARN", "IGNORE"]) - def test_checkpoint_unique_tag(self, tmpdir, valid_mode): + def test_checkpoint_unique_tag(self, tmpdir, valid_mode, compile_mode): config_dict = { "train_batch_size": 2, "steps_per_print": 1, @@ -33,13 +33,15 @@ def test_checkpoint_unique_tag(self, tmpdir, valid_mode): model = SimpleModel(hidden_dim) model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + if compile_mode: + model.compile() if valid_mode == "FAIL": with pytest.raises(AssertionError): model.save_checkpoint(save_dir=tmpdir, tag=f"tag-{dist.get_rank()}") else: model.save_checkpoint(save_dir=tmpdir, tag=f"tag-{dist.get_rank()}") - def test_checkpoint_unknown_tag_validation(self, tmpdir): + def test_checkpoint_unknown_tag_validation(self, tmpdir, compile_mode): config_dict = { "train_batch_size": 2, @@ -60,3 +62,5 @@ def test_checkpoint_unknown_tag_validation(self, tmpdir): with pytest.raises(deepspeed.DeepSpeedConfigError): model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + if compile_mode: + model.compile() diff --git a/tests/unit/checkpoint/test_universal_checkpoint.py b/tests/unit/checkpoint/test_universal_checkpoint.py index 27e151103cc4..e5997f140bc9 100644 --- a/tests/unit/checkpoint/test_universal_checkpoint.py +++ b/tests/unit/checkpoint/test_universal_checkpoint.py @@ -213,7 +213,7 @@ def update_gathered_stage3_optimizer(optimizer_state, param_shapes, world_size): @pytest.mark.parametrize("sub_group_size", [-1, 100]) class TestZeROUniversalCheckpointDP(DistributedTest): - def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_size): + def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_size, compile_mode): if dtype == torch.bfloat16 and not bf16_required_version_check(): pytest.skip( " DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly" @@ -225,6 +225,9 @@ def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_ ds_config["checkpoint"] = {"load_universal": True} univ_model = SimpleModel(hidden_dim, nlayers=2) univ_model = init_ds_engine(univ_model, ds_config, use_torch_adam) + if compile_mode: + univ_model.compile() + univ_model.load_checkpoint(tmpdir, tag=f"{CP_TAG}_universal", load_optimizer_states=load_optim) model_state = univ_model.state_dict() @@ -260,13 +263,16 @@ def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_ univ_model.destroy() @pytest.mark.world_size(2) - def test_dp_world_size_2to2(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam): - self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 2) + def test_dp_world_size_2to2(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam, + compile_mode): + self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, compile_mode) @pytest.mark.world_size(2) - def test_dp_world_size_4to2(self, baseline_ws4, tmpdir, dtype, ds_config, load_optim, use_torch_adam): - self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 2) + def test_dp_world_size_4to2(self, baseline_ws4, tmpdir, dtype, ds_config, load_optim, use_torch_adam, + compile_mode): + self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, compile_mode) @pytest.mark.world_size(4) - def test_dp_world_size_2to4(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam): - self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 4) + def test_dp_world_size_2to4(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam, + compile_mode): + self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 4, compile_mode) diff --git a/tests/unit/checkpoint/test_zero_optimizer.py b/tests/unit/checkpoint/test_zero_optimizer.py index 85c38d7f5ffd..96646c4388e0 100644 --- a/tests/unit/checkpoint/test_zero_optimizer.py +++ b/tests/unit/checkpoint/test_zero_optimizer.py @@ -22,7 +22,7 @@ class TestZeROCheckpoint(DistributedTest): world_size = 2 @pytest.mark.parametrize('zero_stage', [3]) - def test_pipeline_checkpoint_loading(self, tmpdir, zero_stage): + def test_pipeline_checkpoint_loading(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 2, "optimizer": { @@ -41,14 +41,19 @@ def test_pipeline_checkpoint_loading(self, tmpdir, zero_stage): with deepspeed.zero.Init(config_dict_or_path=config_dict): models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)] + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_module_only=True, + compile_mode=compile_mode) - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True) - + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer', [(1, False, 'Adam'), (2, False, 'Adam'), (2, True, 'deepspeed_adam'), (3, False, 'Adam'), (3, True, 'deepspeed_adam')]) - def test_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer): + def test_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer, compile_mode): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") @@ -82,13 +87,19 @@ def test_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_op else: models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=True) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_optimizer_states=True, + compile_mode=compile_mode) + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer', [(1, False, "Adam"), (2, False, "Adam"), (2, True, 'deepspeed_adam'), (3, False, 'Adam'), (3, True, 'deepspeed_adam')]) - def test_not_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer): + def test_not_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer, compile_mode): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") @@ -123,10 +134,16 @@ def test_not_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, ada else: models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=False) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_optimizer_states=False, + compile_mode=compile_mode) + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [1, 2]) - def test_hybrid_optimizer_state(self, tmpdir, zero_stage): + def test_hybrid_optimizer_state(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_micro_batch_size_per_gpu": 2, "gradient_accumulation_steps": 2, @@ -149,10 +166,11 @@ def test_hybrid_optimizer_state(self, tmpdir, zero_stage): base_optimizers=optimizers, hidden_dim=hidden_dim, tmpdir=tmpdir, - load_optimizer_states=True) + load_optimizer_states=True, + compile_mode=compile_mode) @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) - def test_load_module_only(self, tmpdir, zero_stage): + def test_load_module_only(self, tmpdir, zero_stage, compile_mode): if zero_stage == 0 and get_accelerator().device_name() == "cpu": pytest.skip("CPU Accelerator does not support this test") config_dict = { @@ -176,7 +194,12 @@ def test_load_module_only(self, tmpdir, zero_stage): else: models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_module_only=True, + compile_mode=compile_mode) class ws4_model_checkpoint(DistributedFixture): @@ -218,7 +241,7 @@ def run(self, class_tmpdir, elastic_save, load_optim): class TestZeROElasticCheckpoint(DistributedTest): world_size = 2 - def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, load_optim): + def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, load_optim, compile_mode): config_dict = { "train_batch_size": 2, "optimizer": { @@ -243,6 +266,8 @@ def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, l model, _, _, _ = deepspeed.initialize(config=config_dict, model=models[0], model_parameters=models[0].parameters()) + if compile_mode: + model.compile() run_steps = 8 data_loader = random_dataloader(model=model, total_samples=run_steps, @@ -261,6 +286,8 @@ def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, l model, _, _, _ = deepspeed.initialize(config=config_dict, model=models[1], model_parameters=models[1].parameters()) + if compile_mode: + model.compile() model.load_checkpoint(tmpdir, load_optimizer_states=load_optim) if load_optim: @@ -275,7 +302,7 @@ def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, l model.step() def test_elastic_checkpoint_change_dp(self, ws4_model_checkpoint, class_tmpdir, elastic_save, elastic_load, - load_optim): + load_optim, compile_mode): config_dict = { "train_batch_size": 4, "optimizer": { @@ -295,6 +322,8 @@ def test_elastic_checkpoint_change_dp(self, ws4_model_checkpoint, class_tmpdir, # Load checkpoint with dp world size = 2 model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) + if compile_mode: + model.compile() if load_optim: with pytest.raises(deepspeed.runtime.zero.utils.ZeRORuntimeException): model.load_checkpoint(class_tmpdir, load_optimizer_states=load_optim) @@ -306,7 +335,7 @@ class TestZeROSaveLoadEdgeCase(DistributedTest): world_size = 2 @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) - def test_immediate_save_load(self, tmpdir, zero_stage): + def test_immediate_save_load(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 4, "optimizer": { @@ -324,6 +353,8 @@ def test_immediate_save_load(self, tmpdir, zero_stage): model = SimpleModel(hidden_dim) ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None) + if compile_mode: + ds_model.compile() ds_model.save_checkpoint(tmpdir) ds_model.load_checkpoint(tmpdir, load_optimizer_states=False, @@ -331,7 +362,7 @@ def test_immediate_save_load(self, tmpdir, zero_stage): load_module_only=False) @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) - def test_load_immediate_save(self, tmpdir, zero_stage): + def test_load_immediate_save(self, tmpdir, zero_stage, compile_mode): if zero_stage == 0 and get_accelerator().device_name() == "cpu": pytest.skip("CPU Accelerator does not support this test") config_dict = { @@ -352,6 +383,8 @@ def test_load_immediate_save(self, tmpdir, zero_stage): # 1. pretrain a model and save it ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None) + if compile_mode: + ds_model.compile() data_loader = random_dataloader(model=ds_model, total_samples=1, hidden_dim=hidden_dim, device=ds_model.device) for _, batch in enumerate(data_loader): loss = ds_model(batch[0], batch[1]) @@ -363,6 +396,8 @@ def test_load_immediate_save(self, tmpdir, zero_stage): # 2. load and immediately save a model with a fresh ds engine ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None) + if compile_mode: + ds_model.compile() ds_model.load_checkpoint(tmpdir, load_optimizer_states=False, load_lr_scheduler_states=False, @@ -370,7 +405,7 @@ def test_load_immediate_save(self, tmpdir, zero_stage): ds_model.save_checkpoint(tmpdir) @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3]) - def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage): + def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage, compile_mode): config_dict = { "optimizer": { "type": 'Adam' @@ -395,6 +430,8 @@ def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage): # So we config grad_accum=2 and step only once and save_16bit_model ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None) + if compile_mode: + ds_model.compile() data_loader = random_dataloader(model=ds_model, total_samples=2, hidden_dim=hidden_dim, device=ds_model.device) batch = next(iter(data_loader)) @@ -411,11 +448,12 @@ def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage): ds_model.save_checkpoint(tmpdir) +@pytest.mark.parametrize('compile_mode', [True, False]) class TestZeROCheckpointFrozenWeights(DistributedTest): world_size = 2 @pytest.mark.parametrize('zero_stage', [1, 2, 3]) - def test_load_optimizer_state(self, tmpdir, zero_stage): + def test_load_optimizer_state(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 2, @@ -443,10 +481,15 @@ def test_load_optimizer_state(self, tmpdir, zero_stage): with deepspeed.zero.Init(enabled=zero_stage == 3, config_dict_or_path=config_dict): models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=True) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_optimizer_states=True, + compile_mode=compile_mode) @pytest.mark.parametrize('zero_stage', [1, 2, 3]) - def test_not_load_optimizer_state(self, tmpdir, zero_stage): + def test_not_load_optimizer_state(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 2, @@ -473,10 +516,15 @@ def test_not_load_optimizer_state(self, tmpdir, zero_stage): with deepspeed.zero.Init(enabled=zero_stage == 3, config_dict_or_path=config_dict): models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=False) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_optimizer_states=False, + compile_mode=compile_mode) @pytest.mark.parametrize('zero_stage', [1, 2, 3]) - def test_load_module_only(self, tmpdir, zero_stage): + def test_load_module_only(self, tmpdir, zero_stage, compile_mode): config_dict = { "train_batch_size": 2, "optimizer": { @@ -495,10 +543,15 @@ def test_load_module_only(self, tmpdir, zero_stage): with deepspeed.zero.Init(enabled=zero_stage == 3, config_dict_or_path=config_dict): models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)] - checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True) + checkpoint_correctness_verification(config_dict, + models, + hidden_dim, + tmpdir, + load_module_only=True, + compile_mode=compile_mode) @pytest.mark.parametrize('zero_stage', [1, 2]) - def test_save_exclude_frozen_weights(self, tmpdir, zero_stage): + def test_save_exclude_frozen_weights(self, tmpdir, zero_stage, compile_mode): world_size = 1 config_dict = { "train_micro_batch_size_per_gpu": 1, @@ -518,6 +571,8 @@ def test_save_exclude_frozen_weights(self, tmpdir, zero_stage): model = SimpleFrozenModel(hidden_dim, empty_grad=False) ds_engine, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config_dict) + if compile_mode: + ds_engine.compile() # Validate backwards-compatibility of including frozen parameters in checkpoint all_ckpt_folder = os.path.join(tmpdir, 'all_params') @@ -546,7 +601,7 @@ def test_save_exclude_frozen_weights(self, tmpdir, zero_stage): assert loaded_trainable_param_names == trainable_param_names @pytest.mark.parametrize('zero_stage', [1, 2]) - def test_save_exclude_custom_frozen_weights(self, tmpdir, zero_stage): + def test_save_exclude_custom_frozen_weights(self, tmpdir, zero_stage, compile_mode): world_size = 1 config_dict = { "train_micro_batch_size_per_gpu": 1, @@ -566,6 +621,8 @@ def test_save_exclude_custom_frozen_weights(self, tmpdir, zero_stage): model = SimpleFrozenModel(hidden_dim, empty_grad=False) ds_engine, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config_dict) + if compile_mode: + ds_engine.compile() # Validate custom state_dict model state_dict_bk = model.state_dict @@ -592,7 +649,7 @@ class TestSaveTensorClone(DistributedTest): @pytest.mark.parametrize('zero_stage', [1, 2]) @pytest.mark.parametrize('use_cpu_device', [True, False]) - def test_save_tensor_clone(self, tmpdir, zero_stage, use_cpu_device): + def test_save_tensor_clone(self, tmpdir, zero_stage, use_cpu_device, compile_mode): config_dict = { "optimizer": { @@ -609,6 +666,8 @@ def test_save_tensor_clone(self, tmpdir, zero_stage, use_cpu_device): ref_model_state_dict = model.state_dict() ds_engine, _, _, _ = deepspeed.initialize(model=model, config_params=config_dict) + if compile_mode: + ds_engine.compile() clone_device = torch.device('cpu') if use_cpu_device else get_accelerator().current_device() clone_state_dict = clone_tensors_for_torch_save(ds_engine.module.state_dict()) compare_state_dicts(ref_model_state_dict, clone_state_dict) @@ -626,8 +685,9 @@ class TestZeRONonDistributed(DistributedTest): world_size = 1 init_distributed = False + @pytest.mark.parametrize('compile_mode', [True, False]) @pytest.mark.parametrize('zero_stage', [1, 2, 3]) - def test_chmod_exception_handling(self, monkeypatch, zero_stage): + def test_chmod_exception_handling(self, monkeypatch, zero_stage, compile_mode): config_dict = { "optimizer": { @@ -645,6 +705,8 @@ def test_chmod_exception_handling(self, monkeypatch, zero_stage): model=net, model_parameters=net.parameters()) + if compile_mode: + engine.compile() log_called = False def mock_logger_info(message, *args, **kwargs): diff --git a/tests/unit/inference/test_checkpoint_sharding.py b/tests/unit/inference/test_checkpoint_sharding.py index f1e37ee26536..9c99892e93b6 100644 --- a/tests/unit/inference/test_checkpoint_sharding.py +++ b/tests/unit/inference/test_checkpoint_sharding.py @@ -76,7 +76,8 @@ def run(self, model_name, class_tmpdir): class TestCheckpointShard(DistributedTest): world_size = 2 - def test(self, model_name, dtype, class_tmpdir, save_shard): + def test(self, model_name, dtype, class_tmpdir, save_shard, compile_mode): + world_size = int(os.getenv("WORLD_SIZE", "1")) inf_config = { "replace_with_kernel_inject": True, @@ -95,6 +96,8 @@ def test(self, model_name, dtype, class_tmpdir, save_shard): model = AutoModelForCausalLM.from_config(model_config, torch_dtype=torch.bfloat16) model = model.eval() model = deepspeed.init_inference(model, config=inf_config) + if compile_mode: + model.compile() check_dtype(model, dtype) @@ -102,7 +105,7 @@ def test(self, model_name, dtype, class_tmpdir, save_shard): class TestCheckpointShardinAutoTP(DistributedTest): world_size = 2 - def test(self, model_name, class_tmpdir): + def test(self, model_name, class_tmpdir, compile_mode): def write_checkpoints_json(model_name, class_tmpdir): import json @@ -140,3 +143,5 @@ def write_checkpoints_json(model_name, class_tmpdir): model = AutoModelForCausalLM.from_config(model_config, torch_dtype=torch.bfloat16) model = model.eval() model = deepspeed.init_inference(model, config=inf_config) + if compile_mode: + model.compile() diff --git a/tests/unit/inference/test_inference.py b/tests/unit/inference/test_inference.py index 9337eb67ff1e..931b574601bb 100644 --- a/tests/unit/inference/test_inference.py +++ b/tests/unit/inference/test_inference.py @@ -361,6 +361,7 @@ def test( query, inf_kwargs, assert_fn, + compile_mode, perf_meas=True, ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph, enable_triton) @@ -407,6 +408,8 @@ def test( ).max_out_tokens: args.update({'max_out_tokens': pipe.tokenizer.model_max_length}) pipe.model = deepspeed.init_inference(pipe.model, **args) + if compile_mode: + pipe.model.compile() check_injection(pipe.model) # Warm-up queries for perf measurement #for i in range(10): @@ -450,6 +453,7 @@ def test( query, inf_kwargs, assert_fn, + compile_mode, ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False) if invalid_test_msg: @@ -470,6 +474,8 @@ def test( mp_size=self.world_size, dtype=dtype, replace_with_kernel_inject=True) + if compile_mode: + pipe.model.compile() check_injection(pipe.model) # Switch device to GPU so that input tensors are not on CPU pipe.device = torch.device(get_accelerator().device_name(local_rank)) @@ -491,6 +497,7 @@ def test( query, inf_kwargs, assert_fn, + compile_mode, ): model, task = model_w_task dtype = torch.float16 @@ -506,7 +513,8 @@ def test( dtype=dtype, replace_method="auto", replace_with_kernel_inject=True) - + if compile_mode: + pipe.model.compile() ds_output = pipe(query, **inf_kwargs) assert assert_fn(bs_output, ds_output) @@ -528,7 +536,17 @@ def test( @pytest.mark.parametrize("dtype", [torch.float], ids=["fp32"]) class TestInjectionPolicy(DistributedTest): - def test(self, model_w_task, injection_policy, query, inf_kwargs, assert_fn, dtype, world_size): + def test( + self, + model_w_task, + injection_policy, + query, + inf_kwargs, + assert_fn, + dtype, + world_size, + compile_mode, + ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False) if invalid_test_msg: pytest.skip(invalid_test_msg) @@ -546,6 +564,8 @@ def test(self, model_w_task, injection_policy, query, inf_kwargs, assert_fn, dty mp_size=world_size, dtype=dtype, injection_policy=injection_policy) + if compile_mode: + pipe.model.compile() ds_output = pipe(query, **inf_kwargs) print(local_rank, "baseline", bs_output) @@ -626,6 +646,7 @@ def test( inf_kwargs, assert_fn, dtype, + compile_mode, keep_module_on_host, ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False) @@ -653,6 +674,8 @@ def test( mp_size=world_size, dtype=dtype, keep_module_on_host=keep_module_on_host) + if compile_mode: + pipe.model.compile() ds_output = pipe(query, **inf_kwargs) print(local_rank, "baseline", bs_output) @@ -671,6 +694,7 @@ def test_odd_world_size( inf_kwargs, assert_fn, dtype, + compile_mode, keep_module_on_host, ): invalid_test_msg = validate_test(model_w_task, dtype, enable_cuda_graph=False, enable_triton=False) @@ -693,6 +717,8 @@ def test_odd_world_size( mp_size=world_size, dtype=dtype, keep_module_on_host=keep_module_on_host) + if compile_mode: + pipe.model.compile() ds_output = pipe(query, **inf_kwargs) print(local_rank, "baseline", bs_output) @@ -718,7 +744,7 @@ class TestLMCorrectness(DistributedTest): world_size = 1 exec_timeout = 1200 # Give these tests longer to complete - def test(self, model_family, model_name, task): + def test(self, model_family, model_name, task, compile_mode): # imports here to avoid import errors when pytest collects tests import lm_eval import lm_eval.models @@ -776,6 +802,8 @@ def no_pool_bootstrap_stderr(f, xs, iters): replace_with_kernel_inject=True, enable_cuda_graph=False, ) + if compile_mode: + ds_model.compile() check_injection(ds_model) setattr(lm, model_family, ds_model) get_accelerator().synchronize()