Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tensorrt_llm/_torch/modules/fused_moe/communication/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ def combine(
"""
raise NotImplementedError

def destroy(self):
"""Synchronously release resources. Must be called on ALL ranks
before the object is discarded."""

def get_eplb_gathered_statistics(self) -> Optional[torch.Tensor]:
"""
Return gathered EPLB statistics from the last dispatch, if available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ def __init__(
self.deep_ep_buffer = buffer_pool.get_buffer(mapping)
self.deep_ep_buffer.reserve(hidden_size, weight_dtype)

def destroy(self):
"""Release the DeepEP buffer to prevent deadlock/hang.

Buffer.__del__ calls intranode::barrier (collective op). Without
explicit release, non-deterministic GC timing across ranks causes
some ranks to block in the barrier indefinitely.
"""
self.deep_ep_buffer = None

@staticmethod
def is_platform_supported() -> bool:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ def __init__(
self.deep_ep_buffer = buffer_pool.get_low_latency_buffer(mapping)
self.deep_ep_buffer.reserve(self.deep_ep_max_num_tokens, hidden_size, num_slots)

def destroy(self):
"""Release the DeepEP low-latency buffer to prevent deadlock/hang.

Buffer.__del__ calls intranode::barrier (collective op). Without
explicit release, non-deterministic GC timing across ranks causes
some ranks to block in the barrier indefinitely.
"""
self.deep_ep_buffer = None

@staticmethod
def is_platform_supported() -> bool:
"""
Expand Down
24 changes: 22 additions & 2 deletions tensorrt_llm/_torch/modules/fused_moe/configurable_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,37 @@ def determine_communication_method(
feasible_workload = self.comm.is_workload_feasible(all_rank_num_tokens, num_chunks)

if not feasible_workload:
# Current comm cannot be used, fallback to AllGather
all_rank_max_num_tokens = max(all_rank_num_tokens)
logger.info(
f"Communication strategy {self.comm.__class__.__name__} "
f"cannot be used (num_chunks={num_chunks}, max_num_tokens={all_rank_max_num_tokens}). "
f"Falling back to AllGatherReduceScatter."
)

# Switch to AllGather (always works)
self.comm.destroy()
self.comm = AllGatherReduceScatter(mapping=self.mapping)

def destroy(self):
"""Release communication resources.

Must be called on ALL ranks before the module is discarded.
DeepEP Buffer.__del__ calls intranode::barrier (a collective op);
without an explicit, synchronous release, non-deterministic GC
timing across ranks causes some to enter the barrier while others
proceed, resulting in an indefinite hang.

Prefer using ConfigurableMoE as a context manager (``with``) so
that destroy() is called automatically on scope exit.
"""
if self.comm is not None:
self.comm.destroy()

def __enter__(self):
return self

def __exit__(self, *exc_info):
self.destroy()

def _create_comm_strategy_auto(self) -> Communication:
"""
Auto-create the best communication strategy based on hardware and configuration
Expand Down
9 changes: 0 additions & 9 deletions tests/unittest/_torch/modules/moe/moe_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,15 +610,6 @@ def should_skip_multi_gpu(
Returns:
Skip reason string if test should be skipped, None otherwise
"""
# DEEPEPLOWLATENCY hangs on H100 (SM90) in CI multi-GPU tests.
if comm_method == "DEEPEPLOWLATENCY":
capability = torch.cuda.get_device_capability(0)
if capability == (9, 0):
return (
"[CI Hang] DEEPEPLOWLATENCY hangs on H100 (SM90) in "
"multi-GPU tests. Skipping until the issue is resolved."
)

# Only EP modes have ep_size = world_size; TP modes have ep_size = 1
if parallel_mode not in ("DEP", "TEP"):
return None
Expand Down
9 changes: 5 additions & 4 deletions tests/unittest/_torch/modules/moe/test_moe_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,9 @@ def _test_moe_worker_impl(
quantize_util, "weight_loading_mode", MoEWeightLoadingMode.VANILLA
)

with moe_load_balancer:
# Create and setup fused MoE module
fused_moe = create_moe(
with (
moe_load_balancer,
create_moe(
routing_method=routing_method,
reduce_results=True,
model_config=model_cfg,
Expand All @@ -542,7 +542,8 @@ def _test_moe_worker_impl(
swiglu_beta=swiglu_tensors["swiglu_beta"] if swiglu_tensors else None,
swiglu_limit=swiglu_tensors["swiglu_limit"] if swiglu_tensors else None,
weight_loading_mode=weight_loading_mode,
)
) as fused_moe,
):
fused_moe.load_weights([weights])
fused_moe.post_load_weights()
fused_moe.cuda(f"cuda:{mapping.rank}")
Expand Down
Loading