Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def pytest_configure(config):
def pytest_addoption(parser):
parser.addoption("--torch_ver", default=None, type=str)
parser.addoption("--cuda_ver", default=None, type=str)
parser.addoption("--enable-compile-mode", action="store_true", help="Run both compiled/non-compiled versions")
Copy link
Collaborator

@stas00 stas00 Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this would be less ambiguous to what it implies?

Suggested change
parser.addoption("--enable-compile-mode", action="store_true", help="Run both compiled/non-compiled versions")
parser.addoption("--enable-torch-compile", action="store_true", help="Run both compiled/non-compiled versions")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stas00 regarding your questions:

  1. There are some basic torch.compile tests which are always run (e.g. deepspeed-fork/tests/unit/runtime/compile/util.py ). this PR is a way to add more torch.compile coverage for a lot more scenarios using the same logic as the original tests.
  2. I'm sure there are ways to do this other than passing a parameter, but this is standard way to parametrize pytorch tests. it is printed clearly in the pytest logs which config is running in this way. choosing another approach may have some advantages, but could make it less clear which scenario is actually running (e.g. in your example this wouldn't explicitly be printed in the test name).
    example:
    7.84s call unit/checkpoint/test_latest_checkpoint.py::TestLatestCheckpoint::test_existing_latest[False]

I'm not opposed to a different approach, but I'm not convinced it offers an actual advantage. will wait for feedback from @tohtana and @tjruwase.

regarding the flag, I used the one suggested by @tohtana and it makes some semantic sense since this flag enables compile mode in the tests it doesn't enable torch.compile in general ( which might make it more confusing). I don't mind changing this to whatever there is a consensus on.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: flag - whatever the team feels is more intuitive works for me.

the rest I took out from this thread, as it doesn't belong to this particular discussion.



def validate_version(expected, found):
Expand Down Expand Up @@ -70,6 +71,13 @@ def pytest_runtest_call(item):
item.runtest = lambda: True # Dummy function so test is not run twice


def pytest_generate_tests(metafunc):
if "compile_mode" in metafunc.fixturenames:
compile_testing_enabled = metafunc.config.getoption("--enable-compile-mode")
params = [False, True] if compile_testing_enabled else [False]
metafunc.parametrize("compile_mode", params)


# We allow DistributedTest to reuse distributed environments. When the last
# test for a class is run, we want to make sure those distributed environments
# are destroyed.
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/checkpoint/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,14 @@ def checkpoint_correctness_verification(config_dict,
empty_tag=False,
seq_dataloader=False,
load_module_only=False,
dtype=None):
dtype=None,
compile_mode=False):
if dtype is None:
dtype = preferred_dtype()

ds_model = create_deepspeed_model(config_dict=config_dict, model=models[0], base_optimizer=base_optimizers[0])
if compile_mode:
ds_model.compile()

if seq_dataloader:
data_loader = sequence_dataloader(model=ds_model,
Expand Down
9 changes: 6 additions & 3 deletions tests/unit/checkpoint/test_latest_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class TestLatestCheckpoint(DistributedTest):
world_size = 1

def test_existing_latest(self, tmpdir):
def test_existing_latest(self, tmpdir, compile_mode):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
Expand All @@ -39,9 +39,10 @@ def test_existing_latest(self, tmpdir):
load_optimizer_states=True,
load_lr_scheduler_states=False,
empty_tag=True,
dtype=torch.float)
dtype=torch.float,
compile_mode=compile_mode)

def test_missing_latest(self, tmpdir):
def test_missing_latest(self, tmpdir, compile_mode):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
Expand All @@ -55,5 +56,7 @@ def test_missing_latest(self, tmpdir):
hidden_dim = 10
model = SimpleModel(hidden_dim)
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
if compile_mode:
model.compile()
# should be no-op, since latest doesn't exist
model.load_checkpoint(tmpdir)
10 changes: 6 additions & 4 deletions tests/unit/checkpoint/test_lr_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class TestLRSchedulerCheckpoint(DistributedTest):
world_size = 2

def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload):
def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload, compile_mode):
if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
pytest.skip("cpu-adam is not compatible")
if get_accelerator().device_name() == 'cpu':
Expand Down Expand Up @@ -70,9 +70,10 @@ def test_checkpoint_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload):
hidden_dim,
tmpdir,
load_optimizer_states=False,
load_lr_scheduler_states=True)
load_lr_scheduler_states=True,
compile_mode=compile_mode)

def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload):
def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload, compile_mode):
if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
pytest.skip("cpu-adam is not compatible")
if get_accelerator().device_name() == 'cpu':
Expand Down Expand Up @@ -117,4 +118,5 @@ def test_checkpoint_no_lr_scheduler(self, tmpdir, zero_stage, use_cpu_offload):
hidden_dim,
tmpdir,
load_optimizer_states=False,
load_lr_scheduler_states=False)
load_lr_scheduler_states=False,
compile_mode=compile_mode)
5 changes: 3 additions & 2 deletions tests/unit/checkpoint/test_moe_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_checkpoint_moe(self, tmpdir, ep_size):
dtype=torch.float16)

@pytest.mark.parametrize("ep_size, load_optim_states", [(4, True), (4, False), (2, True), (2, False)])
def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states):
def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states, compile_mode):
if not required_torch_version(min_version=1.8):
pytest.skip("DeepSpeed MoE tests need torch 1.8 or higher to run correctly")

Expand Down Expand Up @@ -80,4 +80,5 @@ def test_checkpoint_moe_and_zero(self, tmpdir, ep_size, load_optim_states):
empty_tag=True,
base_optimizers=optimizers,
seq_dataloader=True,
dtype=torch.float16)
dtype=torch.float16,
compile_mode=compile_mode)
21 changes: 13 additions & 8 deletions tests/unit/checkpoint/test_other_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TestOtherOptimizerCheckpoint(DistributedTest):
world_size = 2

@pytest.mark.skipif(not deepspeed.ops.__compatible_ops__[FusedLambBuilder.NAME], reason="lamb is not compatible")
def test_checkpoint_unfused_optimizer(self, tmpdir):
def test_checkpoint_unfused_optimizer(self, tmpdir, compile_mode):
#if not get_accelerator().is_fp16_supported():
# pytest.skip("fp16 is not supported")
config_dict = {
Expand Down Expand Up @@ -67,17 +67,19 @@ def test_checkpoint_unfused_optimizer(self, tmpdir):
hidden_dim=hidden_dim,
tmpdir=tmpdir,
load_optimizer_states=True,
dtype=dtype)
dtype=dtype,
compile_mode=compile_mode)

# Ignore optimizer states
checkpoint_correctness_verification(config_dict,
models=models,
hidden_dim=hidden_dim,
tmpdir=tmpdir,
load_optimizer_states=False,
dtype=dtype)
dtype=dtype,
compile_mode=compile_mode)

def test_checkpoint_fused_optimizer(self, tmpdir):
def test_checkpoint_fused_optimizer(self, tmpdir, compile_mode):
if get_accelerator().device_name() == "cpu":
pytest.skip("CPU accelerator does not support this test")
config_dict = {
Expand Down Expand Up @@ -108,17 +110,19 @@ def test_checkpoint_fused_optimizer(self, tmpdir):
hidden_dim=hidden_dim,
tmpdir=tmpdir,
load_optimizer_states=True,
dtype=dtype)
dtype=dtype,
compile_mode=compile_mode)

# Ignore optimizer states
checkpoint_correctness_verification(config_dict,
models=models,
hidden_dim=hidden_dim,
tmpdir=tmpdir,
load_optimizer_states=False,
dtype=dtype)
dtype=dtype,
compile_mode=compile_mode)

def test_checkpoint_fp32_optimizer(self, tmpdir):
def test_checkpoint_fp32_optimizer(self, tmpdir, compile_mode):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
Expand All @@ -143,4 +147,5 @@ def test_checkpoint_fp32_optimizer(self, tmpdir):
models=models,
hidden_dim=hidden_dim,
tmpdir=tmpdir,
dtype=torch.float32)
dtype=torch.float32,
compile_mode=compile_mode)
5 changes: 3 additions & 2 deletions tests/unit/checkpoint/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TestPipelineCheckpoint(DistributedTest):
world_size = 4

@pytest.mark.parametrize("zero_stage", [0, 1])
def test_checkpoint_pipe_engine(self, zero_stage, tmpdir):
def test_checkpoint_pipe_engine(self, zero_stage, tmpdir, compile_mode):
skip_on_arch(min_arch=7)

config_dict = {
Expand Down Expand Up @@ -61,7 +61,8 @@ def test_checkpoint_pipe_engine(self, zero_stage, tmpdir):
load_optimizer_states=True,
load_lr_scheduler_states=True,
train_batch=True,
dtype=torch.float16 if zero_stage > 0 else torch.float32)
dtype=torch.float16 if zero_stage > 0 else torch.float32,
compile_mode=compile_mode)

@pytest.mark.parametrize(
"base_topo,test_topo",
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/checkpoint/test_shared_weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self):
class TestCheckpointSharedWeights(DistributedTest):
world_size = 2

def test_checkpoint_shared_weights(self, tmp_path):
def test_checkpoint_shared_weights(self, tmp_path, compile_mode):
config = {
"train_micro_batch_size_per_gpu": 2,
"zero_allow_untested_optimizer": True,
Expand All @@ -41,6 +41,9 @@ def test_checkpoint_shared_weights(self, tmp_path):
model=model,
optimizer=optimizer,
)
if compile_mode:
deepspeed_engine.compile()

filename = tmp_path / "checkpoint.pt"
deepspeed_engine.save_checkpoint(filename, tag="checkpoint")

Expand Down
6 changes: 5 additions & 1 deletion tests/unit/checkpoint/test_sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TestSparseCheckpoint(DistributedTest):
[True, True],
])
def test_non_strict_load_sparse(self, tmpdir, to_save_model_has_embedding, to_save_model_sparse,
destination_has_embedding, destination_sparse):
destination_has_embedding, destination_sparse, compile_mode):

class ModelNoEmbedding(torch.nn.Module):

Expand Down Expand Up @@ -66,6 +66,10 @@ def forward(self, x, offsets):
"sparse_gradients": destination_sparse
})

if compile_mode:
engine_to_save.compile()
engine_destination.compile()

save_folder = os.path.join(tmpdir, 'saved_checkpoint')
save_tag = '1'

Expand Down
8 changes: 6 additions & 2 deletions tests/unit/checkpoint/test_tag_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class TestCheckpointValidationTag(DistributedTest):
world_size = 2

@pytest.mark.parametrize('valid_mode', ["FAIL", "WARN", "IGNORE"])
def test_checkpoint_unique_tag(self, tmpdir, valid_mode):
def test_checkpoint_unique_tag(self, tmpdir, valid_mode, compile_mode):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
Expand All @@ -33,13 +33,15 @@ def test_checkpoint_unique_tag(self, tmpdir, valid_mode):
model = SimpleModel(hidden_dim)

model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
if compile_mode:
model.compile()
if valid_mode == "FAIL":
with pytest.raises(AssertionError):
model.save_checkpoint(save_dir=tmpdir, tag=f"tag-{dist.get_rank()}")
else:
model.save_checkpoint(save_dir=tmpdir, tag=f"tag-{dist.get_rank()}")

def test_checkpoint_unknown_tag_validation(self, tmpdir):
def test_checkpoint_unknown_tag_validation(self, tmpdir, compile_mode):

config_dict = {
"train_batch_size": 2,
Expand All @@ -60,3 +62,5 @@ def test_checkpoint_unknown_tag_validation(self, tmpdir):

with pytest.raises(deepspeed.DeepSpeedConfigError):
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
if compile_mode:
model.compile()
20 changes: 13 additions & 7 deletions tests/unit/checkpoint/test_universal_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def update_gathered_stage3_optimizer(optimizer_state, param_shapes, world_size):
@pytest.mark.parametrize("sub_group_size", [-1, 100])
class TestZeROUniversalCheckpointDP(DistributedTest):

def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_size):
def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_size, compile_mode):
if dtype == torch.bfloat16 and not bf16_required_version_check():
pytest.skip(
" DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly"
Expand All @@ -225,6 +225,9 @@ def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_
ds_config["checkpoint"] = {"load_universal": True}
univ_model = SimpleModel(hidden_dim, nlayers=2)
univ_model = init_ds_engine(univ_model, ds_config, use_torch_adam)
if compile_mode:
univ_model.compile()

univ_model.load_checkpoint(tmpdir, tag=f"{CP_TAG}_universal", load_optimizer_states=load_optim)

model_state = univ_model.state_dict()
Expand Down Expand Up @@ -260,13 +263,16 @@ def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam, world_
univ_model.destroy()

@pytest.mark.world_size(2)
def test_dp_world_size_2to2(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam):
self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 2)
def test_dp_world_size_2to2(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam,
compile_mode):
self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, compile_mode)

@pytest.mark.world_size(2)
def test_dp_world_size_4to2(self, baseline_ws4, tmpdir, dtype, ds_config, load_optim, use_torch_adam):
self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 2)
def test_dp_world_size_4to2(self, baseline_ws4, tmpdir, dtype, ds_config, load_optim, use_torch_adam,
compile_mode):
self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, compile_mode)

@pytest.mark.world_size(4)
def test_dp_world_size_2to4(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam):
self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 4)
def test_dp_world_size_2to4(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam,
compile_mode):
self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam, 4, compile_mode)
Loading
Loading