Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce rendezvous and sync port timeout to 5s #1455

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dlrover/python/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ class JobConstant(object):

TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 15
majieyue marked this conversation as resolved.
Show resolved Hide resolved

# sleep 5s before next rendezvous round
RENDEZVOUS_DEFAULT_INTERVAL = 5

# sleep 5s before next port synchronization
SYNC_PORTS_DEFAULT_INTERVAL = 5


class Accelerators(object):
NVIDIA_GPU = "nvidia.com/gpu"
Expand Down
10 changes: 5 additions & 5 deletions dlrover/python/elastic_agent/torch/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,7 @@
"and waits for more nodes."
)
start_pending = time.time()
time.sleep(
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL
)
time.sleep(JobConstant.RENDEZVOUS_DEFAULT_INTERVAL)
start_join = time.time()
if start_join - start_pending > self.pend_timeout:
raise TimeoutError(
Expand All @@ -373,7 +371,7 @@
err_msg, level=TrainingExceptionLevel.RDZV_ERROR
)
raise TimeoutError(err_msg)
time.sleep(JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL)
time.sleep(JobConstant.RENDEZVOUS_DEFAULT_INTERVAL)

Check warning on line 374 in dlrover/python/elastic_agent/torch/training.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/elastic_agent/torch/training.py#L374

Added line #L374 was not covered by tests
rank = list(world.keys()).index(self._node_rank)
world_size = len(world)
logger.info(
Expand Down Expand Up @@ -1185,7 +1183,9 @@
"""Shutdown the executor to save the checkpoint."""
self._save_ckpt_executor.shutdown(wait=False)

def sync_training_ports(self, interval=20):
def sync_training_ports(
self, interval=JobConstant.SYNC_PORTS_DEFAULT_INTERVAL
):
logger.info(f"Accelerator: {self._config.accelerator}")
if (
self._config.accelerator == Accelerators.ASCEND_NPU
Expand Down