Skip to content

Commit fa32dde

Browse files
committed
Fix cloud multi-nodes
* Copy ssh key to allow connections from master to workers * Use local ip for manager's ip such that workers can find it and connect to it * Fix incompatibility between pandas and numpy 2.0.0
1 parent 0f34dd2 commit fa32dde

File tree

6 files changed

+62
-14
lines changed

6 files changed

+62
-14
lines changed

config/cloud-multinodes-system.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ system:
55
- name: manager
66
# Use 1.1.1.1 as an ip placeholder
77
ip: 1.1.1.1
8+
port: 5000
89
# Use this node as the master node or not
910
main: true
1011
# User to use in remote milabench operations

milabench/commands/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,13 @@ def _get_main_and_workers(self):
939939
def _argv(self, **_) -> List:
940940
manager, nodes = self._get_main_and_workers()
941941

942+
# Find local ip such that workers can connect to the port
943+
for manager_ip in manager["ipaddrlist"]:
944+
if ":" in manager_ip or manager_ip == "127.0.0.1":
945+
continue
946+
if all(str.isnumeric(n) for n in manager_ip.split(".")):
947+
break
948+
942949
num_machines = max(1, len(nodes) + 1)
943950

944951
# Cant do that maybe this run is constrained
@@ -976,9 +983,9 @@ def _argv(self, **_) -> List:
976983
f"--machine_rank={self.rank}",
977984
f"--num_machines={num_machines}",
978985
*deepspeed_argv,
979-
f"--gradient_accumulation_steps={self.pack.config.get('gradient_accumulation_steps', 1)}",
980-
f"--num_cpu_threads_per_process={cpu_per_process}",
981-
f"--main_process_ip={manager['ip']}",
986+
f"--gradient_accumulation_steps={self.pack.config['gradient_accumulation_steps']}",
987+
f"--num_cpu_threads_per_process={self.pack.config['argv']['--cpus_per_gpu']}",
988+
f"--main_process_ip={manager_ip}",
982989
f"--main_process_port={manager['port']}",
983990
f"--num_processes={nproc}",
984991
*self.accelerate_argv,

milabench/remote.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,11 @@
22
import os
33
import sys
44

5-
import yaml
6-
7-
from milabench.fs import XPath
8-
95
from . import ROOT_FOLDER
106
from .commands import (
117
CmdCommand,
128
Command,
139
ListCommand,
14-
SCPCommand,
1510
SequenceCommand,
1611
SSHCommand,
1712
VoidCommand,

milabench/scripts/covalent/__main__.py

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,24 +98,63 @@ def _popen(cmd, *args, _env=None, **kwargs):
9898
assert result and result[0]
9999

100100
all_connection_attributes, _ = result
101+
ssh_key_file:str = None
101102
for hostname, connection_attributes in all_connection_attributes.items():
102103
print(f"hostname::>{hostname}")
103104
for attribute,value in connection_attributes.items():
104105
if attribute == "hostname":
105106
continue
106107
print(f"{attribute}::>{value}")
107108

109+
ssh_key_file = (
110+
ssh_key_file or connection_attributes["ssh_key_file"]
111+
)
112+
113+
if len(all_connection_attributes) >= 1:
114+
fn = pathlib.Path(ssh_key_file)
115+
dispatch_id = ct.dispatch(
116+
ct.lattice(executor.cp_to_remote), disable_run=False
117+
)(f".ssh/{fn.name.split('.')[0]}", str(fn))
118+
119+
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result
120+
108121
if argv:
109122
dispatch_id = ct.dispatch(
110-
ct.lattice(
111-
lambda:ct.electron(_popen, executor=executor)(argv)
112-
),
113-
disable_run=False
123+
ct.lattice(executor.list_running_instances), disable_run=False
114124
)()
115125

116126
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result
117127

118-
return_code, _, _ = result if result is not None else (1, "", "")
128+
assert result
129+
130+
dispatch_ids = set()
131+
for connection_attributes in result.get(
132+
(executor.state_prefix, executor.state_id),
133+
{"env": None}
134+
).values():
135+
kwargs = {
136+
**_get_executor_kwargs(args),
137+
**connection_attributes
138+
}
139+
del kwargs["env"]
140+
141+
_executor:ct.executor.BaseExecutor = executor_cls(**kwargs)
142+
143+
dispatch_ids.add(
144+
ct.dispatch(
145+
ct.lattice(
146+
lambda:ct.electron(_popen, executor=_executor)(argv)
147+
),
148+
disable_run=False
149+
)()
150+
)
151+
152+
for dispatch_id in dispatch_ids:
153+
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result
154+
155+
_return_code, _, _ = result if result is not None else (1, "", "")
156+
return_code = return_code or _return_code
157+
119158
finally:
120159
if args.teardown:
121160
result = executor.stop_cloud_instance().result

milabench/system.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ def _resolve_ip(ip):
258258
if not offline:
259259
# Resolve the IP
260260
try:
261+
# Workaround error with `gethostbyaddr` on azure DNS (like
262+
# `inmako.eastus2.cloudapp.azure.com`). A proper fix might be a
263+
# correct network config in terraform.
264+
# socket.herror: [Errno 1] Unknown host
261265
hostname, aliaslist, ipaddrlist = socket.gethostbyname_ex(ip)
262266
lazy_raise = None
263267

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ blessed = "^1.19.1"
2626
pathspec = "^0.9.0"
2727
cp-template = "^0.3.0"
2828
pandas = ">=1.4.2"
29-
numpy = ">=1.23.0,<2.0.0"
29+
# Work around for compatibility issue between numpy 2.0.0 and pandas
30+
# https://github.com/numpy/numpy/issues/26710
31+
numpy = "^1.23.0"
3032
pynvml = "^11.4.1"
3133
tqdm = "^4.64.1"
3234
pip-tools = "^7.4.1"

0 commit comments

Comments
 (0)