Skip to content

Commit 299a67f

Browse files
committed
Revert "Revert "Revert "[Job Submission][refactor 1/N] Add AgentInfo to GCSNodeInfo (…" (ray-project#27308)"
This reverts commit ccf4116.
1 parent efee158 commit 299a67f

23 files changed

+115
-226
lines changed

dashboard/agent.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import argparse
22
import asyncio
33
import io
4+
import json
45
import logging
56
import logging.handlers
67
import os
@@ -12,10 +13,11 @@
1213
import ray._private.utils
1314
import ray.dashboard.consts as dashboard_consts
1415
import ray.dashboard.utils as dashboard_utils
16+
import ray.experimental.internal_kv as internal_kv
1517
from ray._private.gcs_pubsub import GcsAioPublisher, GcsPublisher
1618
from ray._private.gcs_utils import GcsAioClient, GcsClient
1719
from ray._private.ray_logging import setup_component_logger
18-
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc, common_pb2
20+
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc
1921
from ray.experimental.internal_kv import (
2022
_initialize_internal_kv,
2123
_internal_kv_initialized,
@@ -260,20 +262,22 @@ async def _check_parent():
260262
# TODO: Use async version if performance is an issue
261263
# -1 should indicate that http server is not started.
262264
http_port = -1 if not self.http_server else self.http_server.http_port
265+
internal_kv._internal_kv_put(
266+
f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{self.node_id}",
267+
json.dumps([http_port, self.grpc_port]),
268+
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
269+
)
263270

264271
# Register agent to agent manager.
265272
raylet_stub = agent_manager_pb2_grpc.AgentManagerServiceStub(
266273
self.aiogrpc_raylet_channel
267274
)
275+
268276
await raylet_stub.RegisterAgent(
269277
agent_manager_pb2.RegisterAgentRequest(
270-
agent_info=common_pb2.AgentInfo(
271-
id=self.agent_id,
272-
pid=os.getpid(),
273-
grpc_port=self.grpc_port,
274-
http_port=http_port,
275-
ip_address=self.ip,
276-
)
278+
agent_id=self.agent_id,
279+
agent_port=self.grpc_port,
280+
agent_ip_address=self.ip,
277281
)
278282
)
279283

dashboard/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from ray._private.ray_constants import env_integer
22

33
DASHBOARD_LOG_FILENAME = "dashboard.log"
4+
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
45
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log"
56
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS = 2
67
RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME = "RAY_STATE_SERVER_MAX_HTTP_REQUEST"

dashboard/modules/node/node_head.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import asyncio
2+
import json
23
import logging
34
import re
45
import time
56

67
import aiohttp.web
78

89
import ray._private.utils
10+
import ray.dashboard.consts as dashboard_consts
911
import ray.dashboard.optional_utils as dashboard_optional_utils
1012
import ray.dashboard.utils as dashboard_utils
1113
from ray._private import ray_constants
@@ -129,9 +131,10 @@ async def _update_nodes(self):
129131
try:
130132
nodes = await self._get_nodes()
131133

134+
alive_node_ids = []
135+
alive_node_infos = []
132136
node_id_to_ip = {}
133137
node_id_to_hostname = {}
134-
agents = dict(DataSource.agents)
135138
for node in nodes.values():
136139
node_id = node["nodeId"]
137140
ip = node["nodeManagerAddress"]
@@ -147,10 +150,20 @@ async def _update_nodes(self):
147150
node_id_to_hostname[node_id] = hostname
148151
assert node["state"] in ["ALIVE", "DEAD"]
149152
if node["state"] == "ALIVE":
150-
agents[node_id] = [
151-
node["agentInfo"]["httpPort"],
152-
node["agentInfo"]["grpcPort"],
153-
]
153+
alive_node_ids.append(node_id)
154+
alive_node_infos.append(node)
155+
156+
agents = dict(DataSource.agents)
157+
for node_id in alive_node_ids:
158+
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}" f"{node_id}"
159+
# TODO: Use async version if performance is an issue
160+
agent_port = ray.experimental.internal_kv._internal_kv_get(
161+
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
162+
)
163+
if agent_port:
164+
agents[node_id] = json.loads(agent_port)
165+
for node_id in agents.keys() - set(alive_node_ids):
166+
agents.pop(node_id, None)
154167

155168
DataSource.node_id_to_ip.reset(node_id_to_ip)
156169
DataSource.node_id_to_hostname.reset(node_id_to_hostname)

dashboard/modules/node/tests/test_node.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import traceback
77
import random
88
import pytest
9-
import psutil
109
import ray
1110
import threading
1211
from datetime import datetime, timedelta
@@ -19,7 +18,6 @@
1918
wait_for_condition,
2019
wait_until_succeeded_without_exception,
2120
)
22-
from ray._private.state import state
2321

2422

2523
logger = logging.getLogger(__name__)
@@ -350,33 +348,5 @@ def verify():
350348
wait_for_condition(verify, timeout=15)
351349

352350

353-
# See detail: https://github.com/ray-project/ray/issues/24361
354-
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
355-
def test_node_register_with_agent(ray_start_cluster_head):
356-
def test_agent_port(pid, port):
357-
p = psutil.Process(pid)
358-
assert p.cmdline()[2].endswith("dashboard/agent.py")
359-
360-
for c in p.connections():
361-
if c.status == psutil.CONN_LISTEN and c.laddr.port == port:
362-
return
363-
assert False
364-
365-
def test_agent_process(pid):
366-
p = psutil.Process(pid)
367-
assert p.cmdline()[2].endswith("dashboard/agent.py")
368-
369-
for node_info in state.node_table():
370-
agent_info = node_info["AgentInfo"]
371-
assert agent_info["IpAddress"] == node_info["NodeManagerAddress"]
372-
test_agent_port(agent_info["Pid"], agent_info["GrpcPort"])
373-
if agent_info["HttpPort"] >= 0:
374-
test_agent_port(agent_info["Pid"], agent_info["HttpPort"])
375-
else:
376-
# Port conflicts may be caused that the previous
377-
# test did not kill the agent cleanly
378-
assert agent_info["HttpPort"] == -1
379-
380-
381351
if __name__ == "__main__":
382352
sys.exit(pytest.main(["-v", __file__]))

dashboard/tests/test_dashboard.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def test_basic(ray_start_with_dashboard):
110110
"""Dashboard test that starts a Ray cluster with a dashboard server running,
111111
then hits the dashboard API and asserts that it receives sensible data."""
112112
address_info = ray_start_with_dashboard
113+
node_id = address_info["node_id"]
113114
gcs_client = make_gcs_client(address_info)
114115
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
115116

@@ -145,6 +146,11 @@ def test_basic(ray_start_with_dashboard):
145146
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
146147
)
147148
assert dashboard_rpc_address is not None
149+
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{node_id}"
150+
agent_ports = ray.experimental.internal_kv._internal_kv_get(
151+
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
152+
)
153+
assert agent_ports is not None
148154

149155

150156
def test_raylet_and_agent_share_fate(shutdown_only):
@@ -789,6 +795,7 @@ def test_dashboard_port_conflict(ray_start_with_dashboard):
789795
)
790796
def test_gcs_check_alive(fast_gcs_failure_detection, ray_start_with_dashboard):
791797
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True
798+
792799
all_processes = ray._private.worker._global_node.all_processes
793800
dashboard_info = all_processes[ray_constants.PROCESS_TYPE_DASHBOARD][0]
794801
dashboard_proc = psutil.Process(dashboard_info.process.pid)

python/ray/_private/state.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,6 @@ def node_table(self):
164164
"RayletSocketName": item.raylet_socket_name,
165165
"MetricsExportPort": item.metrics_export_port,
166166
"NodeName": item.node_name,
167-
"AgentInfo": {
168-
"IpAddress": item.agent_info.ip_address,
169-
"GrpcPort": item.agent_info.grpc_port,
170-
"HttpPort": item.agent_info.http_port,
171-
"Pid": item.agent_info.pid,
172-
},
173167
}
174168
node_info["alive"] = node_info["Alive"]
175169
node_info["Resources"] = (

python/ray/_private/worker.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import atexit
22
import faulthandler
33
import functools
4-
import grpc
54
import hashlib
65
import inspect
76
import io
@@ -757,6 +756,7 @@ def sigterm_handler(signum, frame):
757756

758757
def print_logs(self):
759758
"""Prints log messages from workers on all nodes in the same job."""
759+
import grpc
760760

761761
subscriber = self.gcs_log_subscriber
762762
subscriber.subscribe()
@@ -1902,11 +1902,6 @@ def connect(
19021902
if mode == SCRIPT_MODE:
19031903
raise e
19041904
elif mode == WORKER_MODE:
1905-
if isinstance(e, grpc.RpcError) and e.code() in (
1906-
grpc.StatusCode.UNAVAILABLE,
1907-
grpc.StatusCode.UNKNOWN,
1908-
):
1909-
raise e
19101905
traceback_str = traceback.format_exc()
19111906
ray._private.utils.publish_error_to_driver(
19121907
ray_constants.VERSION_MISMATCH_PUSH_ERROR,

python/ray/tests/BUILD

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ py_test_module_list(
6767
"test_healthcheck.py",
6868
"test_kill_raylet_signal_log.py",
6969
"test_memstat.py",
70-
"test_protobuf_compatibility.py",
71-
"test_scheduling_performance.py"
70+
"test_protobuf_compatibility.py"
7271
],
7372
size = "medium",
7473
tags = ["exclusive", "medium_size_python_tests_a_to_j", "team:core"],
@@ -121,8 +120,10 @@ py_test_module_list(
121120
"test_multi_node_2.py",
122121
"test_multinode_failures.py",
123122
"test_multinode_failures_2.py",
123+
"test_multiprocessing.py",
124124
"test_object_assign_owner.py",
125125
"test_placement_group.py",
126+
"test_placement_group_2.py",
126127
"test_placement_group_3.py",
127128
"test_placement_group_4.py",
128129
"test_placement_group_5.py",
@@ -183,6 +184,7 @@ py_test_module_list(
183184
"test_cross_language.py",
184185
"test_environ.py",
185186
"test_raylet_output.py",
187+
"test_scheduling_performance.py",
186188
"test_get_or_create_actor.py",
187189
],
188190
size = "small",
@@ -294,7 +296,6 @@ py_test_module_list(
294296
"test_placement_group_mini_integration.py",
295297
"test_scheduling_2.py",
296298
"test_multi_node_3.py",
297-
"test_placement_group_2.py",
298299
],
299300
size = "large",
300301
tags = ["exclusive", "large_size_python_tests_shard_1", "team:core"],

python/ray/tests/conftest.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from tempfile import gettempdir
1616
from typing import List, Tuple
1717
from unittest import mock
18-
import signal
1918

2019
import pytest
2120

@@ -205,19 +204,10 @@ def _ray_start(**kwargs):
205204
init_kwargs.update(kwargs)
206205
# Start the Ray processes.
207206
address_info = ray.init("local", **init_kwargs)
208-
agent_pids = []
209-
for node in ray.nodes():
210-
agent_pids.append(int(node["AgentInfo"]["Pid"]))
211207

212208
yield address_info
213209
# The code after the yield will run as teardown code.
214210
ray.shutdown()
215-
# Make sure the agent process is dead.
216-
for pid in agent_pids:
217-
try:
218-
os.kill(pid, signal.SIGKILL)
219-
except Exception:
220-
pass
221211
# Delete the cluster address just in case.
222212
ray._private.utils.reset_ray_address()
223213

python/ray/tests/test_cli.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -834,9 +834,8 @@ def output_ready():
834834

835835
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported on Windows")
836836
def test_ray_status_multinode(ray_start_cluster):
837-
NODE_NUMBER = 4
838837
cluster = ray_start_cluster
839-
for _ in range(NODE_NUMBER):
838+
for _ in range(4):
840839
cluster.add_node(num_cpus=2)
841840
runner = CliRunner()
842841

@@ -851,12 +850,8 @@ def output_ready():
851850

852851
wait_for_condition(output_ready)
853852

854-
def check_result():
855-
result = runner.invoke(scripts.status, [])
856-
_check_output_via_pattern("test_ray_status_multinode.txt", result)
857-
return True
858-
859-
wait_for_condition(check_result)
853+
result = runner.invoke(scripts.status, [])
854+
_check_output_via_pattern("test_ray_status_multinode.txt", result)
860855

861856

862857
@pytest.mark.skipif(

0 commit comments

Comments
 (0)