Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Job Submission][refactor 1/N] Add AgentInfo to GCSNodeInfo #26302

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e37f022
Add AgentInfo to GCSNodeInfo
Catch-Bull Jul 5, 2022
8b81ad9
fix UT
Catch-Bull Jul 6, 2022
fe80cf7
fix UT
Catch-Bull Jul 6, 2022
55a8a3c
Revert "Add AgentInfo to GCSNodeInfo"
Catch-Bull Jul 11, 2022
9b17685
save
Catch-Bull Jul 11, 2022
004051f
fix lint
Catch-Bull Jul 11, 2022
ae54ecb
save
Catch-Bull Jul 12, 2022
8a33223
fix UT
Catch-Bull Jul 12, 2022
308bb76
fix
Catch-Bull Jul 12, 2022
15ccfa2
fix UT
Catch-Bull Jul 12, 2022
5fe17d1
fix UT
Catch-Bull Jul 12, 2022
cfdea33
fix by comment
Catch-Bull Jul 15, 2022
595fc17
fix
Catch-Bull Jul 18, 2022
e3b608c
add notes
Catch-Bull Jul 18, 2022
b523908
fix lint
Catch-Bull Jul 18, 2022
5b057a2
fix by comment
Catch-Bull Jul 19, 2022
957753e
fix UT
Catch-Bull Jul 19, 2022
d2ce940
Merge branch 'master' of github.com:ray-project/ray into add_agent_ad…
Catch-Bull Jul 20, 2022
0623f32
fix
Catch-Bull Jul 20, 2022
4d62c89
fix
Catch-Bull Jul 20, 2022
5662d59
Merge branch 'add_agent_address_to_node_table' of github.com:alipay/r…
Catch-Bull Jul 22, 2022
96f61ec
fix lint
Catch-Bull Jul 22, 2022
f9855db
Merge branch 'master' of github.com:ray-project/ray into add_agent_ad…
Catch-Bull Jul 26, 2022
1d90fa6
set test_state_api to large
Catch-Bull Jul 26, 2022
88ef5c2
Merge branch 'master' of github.com:ray-project/ray into add_agent_ad…
Catch-Bull Jul 27, 2022
158a744
fix test_ray_shutdown
Catch-Bull Jul 27, 2022
9f6bbc9
fix lint
Catch-Bull Jul 27, 2022
1fe9eed
Merge branch 'master' of github.com:ray-project/ray into add_agent_ad…
Catch-Bull Jul 28, 2022
5cd1d6e
fix lint
Catch-Bull Jul 28, 2022
99e930a
fix test_multiprocessing
Catch-Bull Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import asyncio
import io
import json
import logging
import logging.handlers
import os
Expand All @@ -13,11 +12,10 @@
import ray._private.utils
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.utils as dashboard_utils
import ray.experimental.internal_kv as internal_kv
from ray._private.gcs_pubsub import GcsAioPublisher, GcsPublisher
from ray._private.gcs_utils import GcsAioClient, GcsClient
from ray._private.ray_logging import setup_component_logger
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc, common_pb2
from ray.experimental.internal_kv import (
_initialize_internal_kv,
_internal_kv_initialized,
Expand Down Expand Up @@ -262,22 +260,20 @@ async def _check_parent():
# TODO: Use async version if performance is an issue
# -1 should indicate that http server is not started.
http_port = -1 if not self.http_server else self.http_server.http_port
internal_kv._internal_kv_put(
f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{self.node_id}",
json.dumps([http_port, self.grpc_port]),
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)

# Register agent to agent manager.
raylet_stub = agent_manager_pb2_grpc.AgentManagerServiceStub(
self.aiogrpc_raylet_channel
)

await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_id=self.agent_id,
agent_port=self.grpc_port,
agent_ip_address=self.ip,
agent_info=common_pb2.AgentInfo(
id=self.agent_id,
pid=os.getpid(),
grpc_port=self.grpc_port,
http_port=http_port,
ip_address=self.ip,
)
)
)

Expand Down
1 change: 0 additions & 1 deletion dashboard/consts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from ray._private.ray_constants import env_integer

DASHBOARD_LOG_FILENAME = "dashboard.log"
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log"
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS = 2
RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME = "RAY_STATE_SERVER_MAX_HTTP_REQUEST"
Expand Down
23 changes: 5 additions & 18 deletions dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import asyncio
import json
import logging
import re
import time

import aiohttp.web

import ray._private.utils
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
from ray._private import ray_constants
Expand Down Expand Up @@ -131,10 +129,9 @@ async def _update_nodes(self):
try:
nodes = await self._get_nodes()

alive_node_ids = []
alive_node_infos = []
node_id_to_ip = {}
node_id_to_hostname = {}
agents = dict(DataSource.agents)
for node in nodes.values():
node_id = node["nodeId"]
ip = node["nodeManagerAddress"]
Expand All @@ -150,20 +147,10 @@ async def _update_nodes(self):
node_id_to_hostname[node_id] = hostname
assert node["state"] in ["ALIVE", "DEAD"]
if node["state"] == "ALIVE":
alive_node_ids.append(node_id)
alive_node_infos.append(node)

agents = dict(DataSource.agents)
for node_id in alive_node_ids:
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}" f"{node_id}"
# TODO: Use async version if performance is an issue
agent_port = ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
if agent_port:
agents[node_id] = json.loads(agent_port)
for node_id in agents.keys() - set(alive_node_ids):
agents.pop(node_id, None)
agents[node_id] = [
node["agentInfo"]["httpPort"],
node["agentInfo"]["grpcPort"],
]

DataSource.node_id_to_ip.reset(node_id_to_ip)
DataSource.node_id_to_hostname.reset(node_id_to_hostname)
Expand Down
30 changes: 30 additions & 0 deletions dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import traceback
import random
import pytest
import psutil
import ray
import threading
from datetime import datetime, timedelta
Expand All @@ -18,6 +19,7 @@
wait_for_condition,
wait_until_succeeded_without_exception,
)
from ray._private.state import state


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -348,5 +350,33 @@ def verify():
wait_for_condition(verify, timeout=15)


# See detail: https://github.com/ray-project/ray/issues/24361
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
Catch-Bull marked this conversation as resolved.
Show resolved Hide resolved
def test_node_register_with_agent(ray_start_cluster_head):
def test_agent_port(pid, port):
p = psutil.Process(pid)
assert p.cmdline()[2].endswith("dashboard/agent.py")

for c in p.connections():
if c.status == psutil.CONN_LISTEN and c.laddr.port == port:
return
assert False

def test_agent_process(pid):
p = psutil.Process(pid)
assert p.cmdline()[2].endswith("dashboard/agent.py")

for node_info in state.node_table():
agent_info = node_info["AgentInfo"]
assert agent_info["IpAddress"] == node_info["NodeManagerAddress"]
test_agent_port(agent_info["Pid"], agent_info["GrpcPort"])
if agent_info["HttpPort"] >= 0:
test_agent_port(agent_info["Pid"], agent_info["HttpPort"])
else:
# Port conflicts may be caused that the previous
# test did not kill the agent cleanly
assert agent_info["HttpPort"] == -1


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
7 changes: 0 additions & 7 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def test_basic(ray_start_with_dashboard):
"""Dashboard test that starts a Ray cluster with a dashboard server running,
then hits the dashboard API and asserts that it receives sensible data."""
address_info = ray_start_with_dashboard
node_id = address_info["node_id"]
gcs_client = make_gcs_client(address_info)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)

Expand Down Expand Up @@ -143,11 +142,6 @@ def test_basic(ray_start_with_dashboard):
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)
assert dashboard_rpc_address is not None
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{node_id}"
agent_ports = ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
assert agent_ports is not None


def test_raylet_and_agent_share_fate(shutdown_only):
Expand Down Expand Up @@ -792,7 +786,6 @@ def test_dashboard_port_conflict(ray_start_with_dashboard):
)
def test_gcs_check_alive(fast_gcs_failure_detection, ray_start_with_dashboard):
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True

all_processes = ray._private.worker._global_node.all_processes
dashboard_info = all_processes[ray_constants.PROCESS_TYPE_DASHBOARD][0]
dashboard_proc = psutil.Process(dashboard_info.process.pid)
Expand Down
6 changes: 6 additions & 0 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ def node_table(self):
"RayletSocketName": item.raylet_socket_name,
"MetricsExportPort": item.metrics_export_port,
"NodeName": item.node_name,
"AgentInfo": {
"IpAddress": item.agent_info.ip_address,
"GrpcPort": item.agent_info.grpc_port,
"HttpPort": item.agent_info.http_port,
"Pid": item.agent_info.pid,
},
}
node_info["alive"] = node_info["Alive"]
node_info["Resources"] = (
Expand Down
8 changes: 4 additions & 4 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ py_test_module_list(
"test_healthcheck.py",
"test_kill_raylet_signal_log.py",
"test_memstat.py",
"test_protobuf_compatibility.py"
"test_protobuf_compatibility.py",
"test_scheduling_performance.py"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this PR, the node needs to wait for the agent to register before registering itself to GCS, cluster.add_node will be nearly one second slower, resulting in an increase of 16 * 2 = 32 seconds in total time, so set the test size to medium.

],
size = "medium",
tags = ["exclusive", "medium_size_python_tests_a_to_j", "team:core"],
Expand Down Expand Up @@ -120,7 +121,6 @@ py_test_module_list(
"test_multi_node_2.py",
"test_multinode_failures.py",
"test_multinode_failures_2.py",
"test_multiprocessing.py",
"test_object_assign_owner.py",
"test_placement_group.py",
"test_placement_group_2.py",
Expand All @@ -138,7 +138,6 @@ py_test_module_list(
"test_runtime_env_fork_process.py",
"test_serialization.py",
"test_shuffle.py",
"test_state_api.py",
"test_state_api_log.py",
"test_state_api_summary.py",
"test_stress.py",
Expand Down Expand Up @@ -186,7 +185,6 @@ py_test_module_list(
"test_cross_language.py",
"test_environ.py",
"test_raylet_output.py",
"test_scheduling_performance.py",
"test_get_or_create_actor.py",
],
size = "small",
Expand Down Expand Up @@ -265,6 +263,7 @@ py_test_module_list(
"test_chaos.py",
"test_reference_counting_2.py",
"test_exit_observability.py",
"test_state_api.py",
"test_usage_stats.py",
],
size = "large",
Expand Down Expand Up @@ -295,6 +294,7 @@ py_test_module_list(
"test_placement_group_mini_integration.py",
"test_scheduling_2.py",
"test_multi_node_3.py",
"test_multiprocessing.py",
],
size = "large",
tags = ["exclusive", "large_size_python_tests_shard_1", "team:core"],
Expand Down
10 changes: 10 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from tempfile import gettempdir
from typing import List, Tuple
from unittest import mock
import signal

import pytest

Expand Down Expand Up @@ -204,10 +205,19 @@ def _ray_start(**kwargs):
init_kwargs.update(kwargs)
# Start the Ray processes.
address_info = ray.init("local", **init_kwargs)
agent_pids = []
Catch-Bull marked this conversation as resolved.
Show resolved Hide resolved
for node in ray.nodes():
agent_pids.append(int(node["AgentInfo"]["Pid"]))

yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
# Make sure the agent process is dead.
for pid in agent_pids:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
# Delete the cluster address just in case.
ray._private.utils.reset_ray_address()

Expand Down
11 changes: 8 additions & 3 deletions python/ray/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,9 @@ def output_ready():

@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported on Windows")
def test_ray_status_multinode(ray_start_cluster):
NODE_NUMBER = 4
cluster = ray_start_cluster
for _ in range(4):
for _ in range(NODE_NUMBER):
cluster.add_node(num_cpus=2)
runner = CliRunner()

Expand All @@ -850,8 +851,12 @@ def output_ready():

wait_for_condition(output_ready)

result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
def check_result():
result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
return True

wait_for_condition(check_result)


@pytest.mark.skipif(
Expand Down
32 changes: 28 additions & 4 deletions python/ray/tests/test_ray_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@


def get_all_ray_worker_processes():
processes = [
p.info["cmdline"] for p in psutil.process_iter(attrs=["pid", "name", "cmdline"])
]
processes = psutil.process_iter(attrs=["pid", "name", "cmdline"])

result = []
for p in processes:
if p is not None and len(p) > 0 and "ray::" in p[0]:
cmd_line = p.info["cmdline"]
if cmd_line is not None and len(cmd_line) > 0 and "ray::" in cmd_line[0]:
result.append(p)
return result


def kill_all_ray_worker_process():
ray_process = get_all_ray_worker_processes()
for p in ray_process:
try:
p.kill()
except Exception:
pass


@pytest.fixture
def short_gcs_publish_timeout(monkeypatch):
monkeypatch.setenv("RAY_MAX_GCS_PUBLISH_RETRIES", "3")
Expand All @@ -31,6 +39,10 @@ def short_gcs_publish_timeout(monkeypatch):
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_ray_shutdown(short_gcs_publish_timeout, shutdown_only):
"""Make sure all ray workers are shutdown when driver is done."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()

ray.init()

@ray.remote
Expand All @@ -51,6 +63,10 @@ def f():
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_driver_dead(short_gcs_publish_timeout, shutdown_only):
"""Make sure all ray workers are shutdown when driver is killed."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()

driver = """
import ray
ray.init(_system_config={"gcs_rpc_server_reconnect_timeout_s": 1})
Expand Down Expand Up @@ -80,6 +96,10 @@ def f():
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_node_killed(short_gcs_publish_timeout, ray_start_cluster):
"""Make sure all ray workers when nodes are dead."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()

cluster = ray_start_cluster
# head node.
cluster.add_node(
Expand Down Expand Up @@ -112,6 +132,10 @@ def f():
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_head_node_down(short_gcs_publish_timeout, ray_start_cluster):
"""Make sure all ray workers when head node is dead."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()

cluster = ray_start_cluster
# head node.
head = cluster.add_node(
Expand Down
Loading