Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "[Job Submission][refactor 1/N] Add AgentInfo to GCSNodeInfo (…" #27308

Merged
merged 5 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import asyncio
import io
import json
import logging
import logging.handlers
import os
Expand All @@ -13,11 +12,10 @@
import ray._private.utils
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.utils as dashboard_utils
import ray.experimental.internal_kv as internal_kv
from ray._private.gcs_pubsub import GcsAioPublisher, GcsPublisher
from ray._private.gcs_utils import GcsAioClient, GcsClient
from ray._private.ray_logging import setup_component_logger
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc, common_pb2
from ray.experimental.internal_kv import (
_initialize_internal_kv,
_internal_kv_initialized,
Expand Down Expand Up @@ -262,22 +260,20 @@ async def _check_parent():
# TODO: Use async version if performance is an issue
# -1 should indicate that http server is not started.
http_port = -1 if not self.http_server else self.http_server.http_port
internal_kv._internal_kv_put(
f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{self.node_id}",
json.dumps([http_port, self.grpc_port]),
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)

# Register agent to agent manager.
raylet_stub = agent_manager_pb2_grpc.AgentManagerServiceStub(
self.aiogrpc_raylet_channel
)

await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_id=self.agent_id,
agent_port=self.grpc_port,
agent_ip_address=self.ip,
agent_info=common_pb2.AgentInfo(
id=self.agent_id,
pid=os.getpid(),
grpc_port=self.grpc_port,
http_port=http_port,
ip_address=self.ip,
)
)
)

Expand Down
1 change: 0 additions & 1 deletion dashboard/consts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from ray._private.ray_constants import env_integer

DASHBOARD_LOG_FILENAME = "dashboard.log"
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log"
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS = 2
RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME = "RAY_STATE_SERVER_MAX_HTTP_REQUEST"
Expand Down
23 changes: 5 additions & 18 deletions dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import asyncio
import json
import logging
import re
import time

import aiohttp.web

import ray._private.utils
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
from ray._private import ray_constants
Expand Down Expand Up @@ -131,10 +129,9 @@ async def _update_nodes(self):
try:
nodes = await self._get_nodes()

alive_node_ids = []
alive_node_infos = []
node_id_to_ip = {}
node_id_to_hostname = {}
agents = dict(DataSource.agents)
for node in nodes.values():
node_id = node["nodeId"]
ip = node["nodeManagerAddress"]
Expand All @@ -150,20 +147,10 @@ async def _update_nodes(self):
node_id_to_hostname[node_id] = hostname
assert node["state"] in ["ALIVE", "DEAD"]
if node["state"] == "ALIVE":
alive_node_ids.append(node_id)
alive_node_infos.append(node)

agents = dict(DataSource.agents)
for node_id in alive_node_ids:
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}" f"{node_id}"
# TODO: Use async version if performance is an issue
agent_port = ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
if agent_port:
agents[node_id] = json.loads(agent_port)
for node_id in agents.keys() - set(alive_node_ids):
agents.pop(node_id, None)
agents[node_id] = [
node["agentInfo"]["httpPort"],
node["agentInfo"]["grpcPort"],
]

DataSource.node_id_to_ip.reset(node_id_to_ip)
DataSource.node_id_to_hostname.reset(node_id_to_hostname)
Expand Down
30 changes: 30 additions & 0 deletions dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import traceback
import random
import pytest
import psutil
import ray
import threading
from datetime import datetime, timedelta
Expand All @@ -18,6 +19,7 @@
wait_for_condition,
wait_until_succeeded_without_exception,
)
from ray._private.state import state


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -348,5 +350,33 @@ def verify():
wait_for_condition(verify, timeout=15)


# See detail: https://github.com/ray-project/ray/issues/24361
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
def test_node_register_with_agent(ray_start_cluster_head):
def test_agent_port(pid, port):
p = psutil.Process(pid)
assert p.cmdline()[2].endswith("dashboard/agent.py")

for c in p.connections():
if c.status == psutil.CONN_LISTEN and c.laddr.port == port:
return
assert False

def test_agent_process(pid):
p = psutil.Process(pid)
assert p.cmdline()[2].endswith("dashboard/agent.py")

for node_info in state.node_table():
agent_info = node_info["AgentInfo"]
assert agent_info["IpAddress"] == node_info["NodeManagerAddress"]
test_agent_port(agent_info["Pid"], agent_info["GrpcPort"])
if agent_info["HttpPort"] >= 0:
test_agent_port(agent_info["Pid"], agent_info["HttpPort"])
else:
# Port conflicts may be caused that the previous
# test did not kill the agent cleanly
assert agent_info["HttpPort"] == -1


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
7 changes: 0 additions & 7 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def test_basic(ray_start_with_dashboard):
"""Dashboard test that starts a Ray cluster with a dashboard server running,
then hits the dashboard API and asserts that it receives sensible data."""
address_info = ray_start_with_dashboard
node_id = address_info["node_id"]
gcs_client = make_gcs_client(address_info)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)

Expand Down Expand Up @@ -146,11 +145,6 @@ def test_basic(ray_start_with_dashboard):
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)
assert dashboard_rpc_address is not None
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{node_id}"
agent_ports = ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
assert agent_ports is not None


def test_raylet_and_agent_share_fate(shutdown_only):
Expand Down Expand Up @@ -795,7 +789,6 @@ def test_dashboard_port_conflict(ray_start_with_dashboard):
)
def test_gcs_check_alive(fast_gcs_failure_detection, ray_start_with_dashboard):
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True

all_processes = ray._private.worker._global_node.all_processes
dashboard_info = all_processes[ray_constants.PROCESS_TYPE_DASHBOARD][0]
dashboard_proc = psutil.Process(dashboard_info.process.pid)
Expand Down
6 changes: 6 additions & 0 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ def node_table(self):
"RayletSocketName": item.raylet_socket_name,
"MetricsExportPort": item.metrics_export_port,
"NodeName": item.node_name,
"AgentInfo": {
"IpAddress": item.agent_info.ip_address,
"GrpcPort": item.agent_info.grpc_port,
"HttpPort": item.agent_info.http_port,
"Pid": item.agent_info.pid,
},
}
node_info["alive"] = node_info["Alive"]
node_info["Resources"] = (
Expand Down
7 changes: 6 additions & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import atexit
import faulthandler
import functools
import grpc
import hashlib
import inspect
import io
Expand Down Expand Up @@ -756,7 +757,6 @@ def sigterm_handler(signum, frame):

def print_logs(self):
"""Prints log messages from workers on all nodes in the same job."""
import grpc

subscriber = self.gcs_log_subscriber
subscriber.subscribe()
Expand Down Expand Up @@ -1902,6 +1902,11 @@ def connect(
if mode == SCRIPT_MODE:
raise e
elif mode == WORKER_MODE:
if isinstance(e, grpc.RpcError) and e.code() in (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here.

grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.UNKNOWN,
):
raise e
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if node.check_version_info raise grpc.RpcError, it means GCS is unreachable, so the ray._private.utils.publish_error_to_driver will fail too, and we raise the Exception early can avoid wait to publish timeout, it will cost 60s for nothing

traceback_str = traceback.format_exc()
ray._private.utils.publish_error_to_driver(
ray_constants.VERSION_MISMATCH_PUSH_ERROR,
Expand Down
7 changes: 3 additions & 4 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ py_test_module_list(
"test_healthcheck.py",
"test_kill_raylet_signal_log.py",
"test_memstat.py",
"test_protobuf_compatibility.py"
"test_protobuf_compatibility.py",
"test_scheduling_performance.py"
],
size = "medium",
tags = ["exclusive", "medium_size_python_tests_a_to_j", "team:core"],
Expand Down Expand Up @@ -120,10 +121,8 @@ py_test_module_list(
"test_multi_node_2.py",
"test_multinode_failures.py",
"test_multinode_failures_2.py",
"test_multiprocessing.py",
"test_object_assign_owner.py",
"test_placement_group.py",
"test_placement_group_2.py",
"test_placement_group_3.py",
"test_placement_group_4.py",
"test_placement_group_5.py",
Expand Down Expand Up @@ -184,7 +183,6 @@ py_test_module_list(
"test_cross_language.py",
"test_environ.py",
"test_raylet_output.py",
"test_scheduling_performance.py",
"test_get_or_create_actor.py",
],
size = "small",
Expand Down Expand Up @@ -295,6 +293,7 @@ py_test_module_list(
"test_placement_group_mini_integration.py",
"test_scheduling_2.py",
"test_multi_node_3.py",
"test_placement_group_2.py",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current PR will make cluster.add_node takes 1 second more than before, There are nearly 30 cluster.add_node in test_placement_group_2.py, it takes about 240s before this PR, So I think it is reasonable to increase the time limit of test_placement_group_2 to avoid flaky tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster.add_node takes 1 second more than before

This actually sounds pretty bad... (I feel like it will increase the runtime of tests too long).

Why is this the case?

],
size = "large",
tags = ["exclusive", "large_size_python_tests_shard_1", "team:core"],
Expand Down
10 changes: 10 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from tempfile import gettempdir
from typing import List, Tuple
from unittest import mock
import signal

import pytest

Expand Down Expand Up @@ -204,10 +205,19 @@ def _ray_start(**kwargs):
init_kwargs.update(kwargs)
# Start the Ray processes.
address_info = ray.init("local", **init_kwargs)
agent_pids = []
for node in ray.nodes():
agent_pids.append(int(node["AgentInfo"]["Pid"]))

yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
# Make sure the agent process is dead.
for pid in agent_pids:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
# Delete the cluster address just in case.
ray._private.utils.reset_ray_address()

Expand Down
11 changes: 8 additions & 3 deletions python/ray/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,9 @@ def output_ready():

@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported on Windows")
def test_ray_status_multinode(ray_start_cluster):
NODE_NUMBER = 4
cluster = ray_start_cluster
for _ in range(4):
for _ in range(NODE_NUMBER):
cluster.add_node(num_cpus=2)
runner = CliRunner()

Expand All @@ -850,8 +851,12 @@ def output_ready():

wait_for_condition(output_ready)

result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
def check_result():
result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
return True

wait_for_condition(check_result)


@pytest.mark.skipif(
Expand Down
Loading