Skip to content

Conversation

@Sparks0219
Copy link
Contributor

@Sparks0219 Sparks0219 commented Jan 7, 2026

This reverts commit 7198193.

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request reverts the functionality for publishing events from the aggregator to GCS. The changes primarily involve removing the GCS publisher client, related configurations, and tests.

Alongside the revert, there's a beneficial refactoring of the event filtering logic. Previously, filtering was handled within each publisher client. This has been centralized into the AggregatorAgent, which now passes a filter function to the AsyncHttpPublisherClient. This improves modularity and removes code duplication.

The code removal appears to be clean and complete. I've identified a bug in one of the updated tests and provided a suggestion for a fix.

Comment on lines +560 to 577
def test_case_publisher_specific_metrics_correct(publisher_name: str):
fetch_prometheus_timeseries(prom_addresses, timeseries)
metric_samples = timeseries.metric_samples.values()
expected_metrics_values = {
"ray_aggregator_agent_published_events_total": 1.0,
"ray_aggregator_agent_filtered_events_total": 1.0,
"ray_aggregator_agent_queue_dropped_events_total": 1.0,
}
for descriptor, expected_value in expected_metrics_values.items():
samples = [
m
for m in metric_samples
if m.name == descriptor and m.labels[CONSUMER_TAG_KEY] == consumer_name
]
samples = [m for m in metric_samples if m.name == descriptor]
if not samples:
return False
if samples[0].value != expected_value:
if (
samples[0].value != expected_value
or samples[0].labels[CONSUMER_TAG_KEY] != publisher_name
):
return False
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This test function has a bug. It attempts to check for the CONSUMER_TAG_KEY on all metrics, including ray_aggregator_agent_queue_dropped_events_total, which is a global metric and does not have this tag. This will cause a KeyError.

To fix this, ray_aggregator_agent_queue_dropped_events_total should be removed from this test and verified separately with other global metrics. Additionally, the logic for finding samples should be more robust by filtering on the consumer tag directly and using .get() to avoid errors.

Suggested change
def test_case_publisher_specific_metrics_correct(publisher_name: str):
fetch_prometheus_timeseries(prom_addresses, timeseries)
metric_samples = timeseries.metric_samples.values()
expected_metrics_values = {
"ray_aggregator_agent_published_events_total": 1.0,
"ray_aggregator_agent_filtered_events_total": 1.0,
"ray_aggregator_agent_queue_dropped_events_total": 1.0,
}
for descriptor, expected_value in expected_metrics_values.items():
samples = [
m
for m in metric_samples
if m.name == descriptor and m.labels[CONSUMER_TAG_KEY] == consumer_name
]
samples = [m for m in metric_samples if m.name == descriptor]
if not samples:
return False
if samples[0].value != expected_value:
if (
samples[0].value != expected_value
or samples[0].labels[CONSUMER_TAG_KEY] != publisher_name
):
return False
return True
def test_case_publisher_specific_metrics_correct(publisher_name: str):
fetch_prometheus_timeseries(prom_addresses, timeseries)
metric_samples = timeseries.metric_samples.values()
expected_metrics_values = {
"ray_aggregator_agent_published_events_total": 1.0,
"ray_aggregator_agent_filtered_events_total": 1.0,
}
for descriptor, expected_value in expected_metrics_values.items():
samples = [
m
for m in metric_samples
if m.name == descriptor
and m.labels.get(CONSUMER_TAG_KEY) == publisher_name
]
if not samples:
return False
if samples[0].value != expected_value:
return False
return True

…project#55781)"

This reverts commit 7198193.

Signed-off-by: joshlee <joshlee@anyscale.com>
@Sparks0219 Sparks0219 force-pushed the joshlee/find-memleak-in-test-scheduling branch from 7f8e30a to c36e18f Compare January 7, 2026 21:33
@Sparks0219 Sparks0219 marked this pull request as ready for review January 7, 2026 21:34
@Sparks0219 Sparks0219 requested a review from a team as a code owner January 7, 2026 21:34
@Sparks0219 Sparks0219 added the go add ONLY when ready to merge, run all tests label Jan 7, 2026
@edoakes edoakes enabled auto-merge (squash) January 7, 2026 22:52
@ray-gardener ray-gardener bot added the core Issues that should be addressed in Ray Core label Jan 8, 2026
@Sparks0219
Copy link
Contributor Author

Fixed by #59965

@Sparks0219 Sparks0219 closed this Jan 8, 2026
auto-merge was automatically disabled January 8, 2026 18:57

Pull request was closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants