Skip to content

Commit

Permalink
[Job Submission][refactor 1/N] Add AgentInfo to GCSNodeInfo (ray-proj…
Browse files Browse the repository at this point in the history
…ect#26302)

This is the first PR of ray-project#25963 :
1. Moved the agent information from `internal KV to `GCSNodeInfo`,
2. raylet registers itself after the agent process finished register.

Motivation:
Storing agent information in `internal KV` and registering nodes in GCS (write node information to `GCSNodeInfo`) are two asynchronous operations, which will bring some complex timing problems, especially after `raylet` failover

Signed-off-by: Stefan van der Kleij <s.vanderkleij@viroteq.com>
  • Loading branch information
Catch-Bull authored and Stefan van der Kleij committed Aug 18, 2022
1 parent b81a404 commit 6e9dd95
Show file tree
Hide file tree
Showing 23 changed files with 216 additions and 106 deletions.
20 changes: 8 additions & 12 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import asyncio
import io
import json
import logging
import logging.handlers
import os
Expand All @@ -13,11 +12,10 @@
import ray._private.utils
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.utils as dashboard_utils
import ray.experimental.internal_kv as internal_kv
from ray._private.gcs_pubsub import GcsAioPublisher, GcsPublisher
from ray._private.gcs_utils import GcsAioClient, GcsClient
from ray._private.ray_logging import setup_component_logger
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc, common_pb2
from ray.experimental.internal_kv import (
_initialize_internal_kv,
_internal_kv_initialized,
Expand Down Expand Up @@ -262,22 +260,20 @@ async def _check_parent():
# TODO: Use async version if performance is an issue
# -1 should indicate that http server is not started.
http_port = -1 if not self.http_server else self.http_server.http_port
internal_kv._internal_kv_put(
f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{self.node_id}",
json.dumps([http_port, self.grpc_port]),
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)

# Register agent to agent manager.
raylet_stub = agent_manager_pb2_grpc.AgentManagerServiceStub(
self.aiogrpc_raylet_channel
)

await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_id=self.agent_id,
agent_port=self.grpc_port,
agent_ip_address=self.ip,
agent_info=common_pb2.AgentInfo(
id=self.agent_id,
pid=os.getpid(),
grpc_port=self.grpc_port,
http_port=http_port,
ip_address=self.ip,
)
)
)

Expand Down
1 change: 0 additions & 1 deletion dashboard/consts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from ray._private.ray_constants import env_integer

DASHBOARD_LOG_FILENAME = "dashboard.log"
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log"
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS = 2
RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME = "RAY_STATE_SERVER_MAX_HTTP_REQUEST"
Expand Down
23 changes: 5 additions & 18 deletions dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import asyncio
import json
import logging
import re
import time

import aiohttp.web

import ray._private.utils
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
from ray._private import ray_constants
Expand Down Expand Up @@ -131,10 +129,9 @@ async def _update_nodes(self):
try:
nodes = await self._get_nodes()

alive_node_ids = []
alive_node_infos = []
node_id_to_ip = {}
node_id_to_hostname = {}
agents = dict(DataSource.agents)
for node in nodes.values():
node_id = node["nodeId"]
ip = node["nodeManagerAddress"]
Expand All @@ -150,20 +147,10 @@ async def _update_nodes(self):
node_id_to_hostname[node_id] = hostname
assert node["state"] in ["ALIVE", "DEAD"]
if node["state"] == "ALIVE":
alive_node_ids.append(node_id)
alive_node_infos.append(node)

agents = dict(DataSource.agents)
for node_id in alive_node_ids:
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}" f"{node_id}"
# TODO: Use async version if performance is an issue
agent_port = ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
if agent_port:
agents[node_id] = json.loads(agent_port)
for node_id in agents.keys() - set(alive_node_ids):
agents.pop(node_id, None)
agents[node_id] = [
node["agentInfo"]["httpPort"],
node["agentInfo"]["grpcPort"],
]

DataSource.node_id_to_ip.reset(node_id_to_ip)
DataSource.node_id_to_hostname.reset(node_id_to_hostname)
Expand Down
30 changes: 30 additions & 0 deletions dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import traceback
import random
import pytest
import psutil
import ray
import threading
from datetime import datetime, timedelta
Expand All @@ -18,6 +19,7 @@
wait_for_condition,
wait_until_succeeded_without_exception,
)
from ray._private.state import state


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -348,5 +350,33 @@ def verify():
wait_for_condition(verify, timeout=15)


# See detail: https://github.com/ray-project/ray/issues/24361
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
def test_node_register_with_agent(ray_start_cluster_head):
def test_agent_port(pid, port):
p = psutil.Process(pid)
assert p.cmdline()[2].endswith("dashboard/agent.py")

for c in p.connections():
if c.status == psutil.CONN_LISTEN and c.laddr.port == port:
return
assert False

def test_agent_process(pid):
p = psutil.Process(pid)
assert p.cmdline()[2].endswith("dashboard/agent.py")

for node_info in state.node_table():
agent_info = node_info["AgentInfo"]
assert agent_info["IpAddress"] == node_info["NodeManagerAddress"]
test_agent_port(agent_info["Pid"], agent_info["GrpcPort"])
if agent_info["HttpPort"] >= 0:
test_agent_port(agent_info["Pid"], agent_info["HttpPort"])
else:
# Port conflicts may be caused that the previous
# test did not kill the agent cleanly
assert agent_info["HttpPort"] == -1


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
7 changes: 0 additions & 7 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def test_basic(ray_start_with_dashboard):
"""Dashboard test that starts a Ray cluster with a dashboard server running,
then hits the dashboard API and asserts that it receives sensible data."""
address_info = ray_start_with_dashboard
node_id = address_info["node_id"]
gcs_client = make_gcs_client(address_info)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)

Expand Down Expand Up @@ -143,11 +142,6 @@ def test_basic(ray_start_with_dashboard):
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)
assert dashboard_rpc_address is not None
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{node_id}"
agent_ports = ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
assert agent_ports is not None


def test_raylet_and_agent_share_fate(shutdown_only):
Expand Down Expand Up @@ -792,7 +786,6 @@ def test_dashboard_port_conflict(ray_start_with_dashboard):
)
def test_gcs_check_alive(fast_gcs_failure_detection, ray_start_with_dashboard):
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True

all_processes = ray._private.worker._global_node.all_processes
dashboard_info = all_processes[ray_constants.PROCESS_TYPE_DASHBOARD][0]
dashboard_proc = psutil.Process(dashboard_info.process.pid)
Expand Down
6 changes: 6 additions & 0 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ def node_table(self):
"RayletSocketName": item.raylet_socket_name,
"MetricsExportPort": item.metrics_export_port,
"NodeName": item.node_name,
"AgentInfo": {
"IpAddress": item.agent_info.ip_address,
"GrpcPort": item.agent_info.grpc_port,
"HttpPort": item.agent_info.http_port,
"Pid": item.agent_info.pid,
},
}
node_info["alive"] = node_info["Alive"]
node_info["Resources"] = (
Expand Down
8 changes: 4 additions & 4 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ py_test_module_list(
"test_healthcheck.py",
"test_kill_raylet_signal_log.py",
"test_memstat.py",
"test_protobuf_compatibility.py"
"test_protobuf_compatibility.py",
"test_scheduling_performance.py"
],
size = "medium",
tags = ["exclusive", "medium_size_python_tests_a_to_j", "team:core"],
Expand Down Expand Up @@ -120,7 +121,6 @@ py_test_module_list(
"test_multi_node_2.py",
"test_multinode_failures.py",
"test_multinode_failures_2.py",
"test_multiprocessing.py",
"test_object_assign_owner.py",
"test_placement_group.py",
"test_placement_group_2.py",
Expand All @@ -138,7 +138,6 @@ py_test_module_list(
"test_runtime_env_fork_process.py",
"test_serialization.py",
"test_shuffle.py",
"test_state_api.py",
"test_state_api_log.py",
"test_state_api_summary.py",
"test_stress.py",
Expand Down Expand Up @@ -186,7 +185,6 @@ py_test_module_list(
"test_cross_language.py",
"test_environ.py",
"test_raylet_output.py",
"test_scheduling_performance.py",
"test_get_or_create_actor.py",
],
size = "small",
Expand Down Expand Up @@ -265,6 +263,7 @@ py_test_module_list(
"test_chaos.py",
"test_reference_counting_2.py",
"test_exit_observability.py",
"test_state_api.py",
"test_usage_stats.py",
],
size = "large",
Expand Down Expand Up @@ -295,6 +294,7 @@ py_test_module_list(
"test_placement_group_mini_integration.py",
"test_scheduling_2.py",
"test_multi_node_3.py",
"test_multiprocessing.py",
],
size = "large",
tags = ["exclusive", "large_size_python_tests_shard_1", "team:core"],
Expand Down
10 changes: 10 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from tempfile import gettempdir
from typing import List, Tuple
from unittest import mock
import signal

import pytest

Expand Down Expand Up @@ -204,10 +205,19 @@ def _ray_start(**kwargs):
init_kwargs.update(kwargs)
# Start the Ray processes.
address_info = ray.init("local", **init_kwargs)
agent_pids = []
for node in ray.nodes():
agent_pids.append(int(node["AgentInfo"]["Pid"]))

yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
# Make sure the agent process is dead.
for pid in agent_pids:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
# Delete the cluster address just in case.
ray._private.utils.reset_ray_address()

Expand Down
11 changes: 8 additions & 3 deletions python/ray/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,9 @@ def output_ready():

@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported on Windows")
def test_ray_status_multinode(ray_start_cluster):
NODE_NUMBER = 4
cluster = ray_start_cluster
for _ in range(4):
for _ in range(NODE_NUMBER):
cluster.add_node(num_cpus=2)
runner = CliRunner()

Expand All @@ -850,8 +851,12 @@ def output_ready():

wait_for_condition(output_ready)

result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
def check_result():
result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
return True

wait_for_condition(check_result)


@pytest.mark.skipif(
Expand Down
32 changes: 28 additions & 4 deletions python/ray/tests/test_ray_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@


def get_all_ray_worker_processes():
processes = [
p.info["cmdline"] for p in psutil.process_iter(attrs=["pid", "name", "cmdline"])
]
processes = psutil.process_iter(attrs=["pid", "name", "cmdline"])

result = []
for p in processes:
if p is not None and len(p) > 0 and "ray::" in p[0]:
cmd_line = p.info["cmdline"]
if cmd_line is not None and len(cmd_line) > 0 and "ray::" in cmd_line[0]:
result.append(p)
return result


def kill_all_ray_worker_process():
ray_process = get_all_ray_worker_processes()
for p in ray_process:
try:
p.kill()
except Exception:
pass


@pytest.fixture
def short_gcs_publish_timeout(monkeypatch):
monkeypatch.setenv("RAY_MAX_GCS_PUBLISH_RETRIES", "3")
Expand All @@ -31,6 +39,10 @@ def short_gcs_publish_timeout(monkeypatch):
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_ray_shutdown(short_gcs_publish_timeout, shutdown_only):
"""Make sure all ray workers are shutdown when driver is done."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()

ray.init()

@ray.remote
Expand All @@ -51,6 +63,10 @@ def f():
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_driver_dead(short_gcs_publish_timeout, shutdown_only):
"""Make sure all ray workers are shutdown when driver is killed."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()

driver = """
import ray
ray.init(_system_config={"gcs_rpc_server_reconnect_timeout_s": 1})
Expand Down Expand Up @@ -80,6 +96,10 @@ def f():
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_node_killed(short_gcs_publish_timeout, ray_start_cluster):
"""Make sure all ray workers when nodes are dead."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()

cluster = ray_start_cluster
# head node.
cluster.add_node(
Expand Down Expand Up @@ -112,6 +132,10 @@ def f():
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_head_node_down(short_gcs_publish_timeout, ray_start_cluster):
"""Make sure all ray workers when head node is dead."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()

cluster = ray_start_cluster
# head node.
head = cluster.add_node(
Expand Down
Loading

0 comments on commit 6e9dd95

Please sign in to comment.