Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 95 additions & 53 deletions grpc/src/client/load_balancing/child_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState,
WeakSubchannel, WorkScheduler,
};
use crate::client::name_resolution::{Address, ResolverUpdate};
use crate::client::name_resolution::{Address, Endpoint, ResolverUpdate};
use crate::client::ConnectivityState;
use crate::rt::Runtime;

Expand All @@ -50,6 +50,7 @@
update_sharder: Box<dyn ResolverUpdateSharder<T>>,
pending_work: Arc<Mutex<HashSet<usize>>>,
runtime: Arc<dyn Runtime>,
updated: bool,
}

struct Child<T> {
Expand Down Expand Up @@ -81,6 +82,47 @@
) -> Result<Box<dyn Iterator<Item = ChildUpdate<T>>>, Box<dyn Error + Send + Sync>>;
}

/// EndpointSharder shards a resolver update into individual endpoints,
/// with each endpoint serving as the unique identifier for a child.
///
/// The EndpointSharder implements the ResolverUpdateSharder trait,
/// allowing any load-balancing (LB) policy that uses the ChildManager
/// to split a resolver update into individual endpoints, with one endpoint for each child.
pub struct EndpointSharder {
pub builder: Arc<dyn LbPolicyBuilder>,
}

// Creates a ChildUpdate for each endpoint received.
impl ResolverUpdateSharder<Endpoint> for EndpointSharder {
fn shard_update(
&self,
resolver_update: ResolverUpdate,
) -> Result<Box<dyn Iterator<Item = ChildUpdate<Endpoint>>>, Box<dyn Error + Send + Sync>> {
let update: Vec<_> = resolver_update
.endpoints
.unwrap()
.into_iter()
.map(|e| ChildUpdate {
child_identifier: e.clone(),
child_policy_builder: self.builder.clone(),
child_update: ResolverUpdate {
attributes: resolver_update.attributes.clone(),
endpoints: Ok(vec![e.clone()]),
service_config: resolver_update.service_config.clone(),
resolution_note: resolver_update.resolution_note.clone(),
},
})
.collect();
Ok(Box::new(update.into_iter()))
}
}

impl EndpointSharder {
pub fn new(builder: Arc<dyn LbPolicyBuilder>) -> Self {
Self { builder }
}
}

impl<T> ChildManager<T> {
/// Creates a new ChildManager LB policy. shard_update is called whenever a
/// resolver_update operation occurs.
Expand All @@ -94,6 +136,7 @@
children: Default::default(),
pending_work: Default::default(),
runtime,
updated: false,
}
}

Expand Down Expand Up @@ -158,8 +201,34 @@
// Update the tracked state if the child produced an update.
if let Some(state) = channel_controller.picker_update {
self.children[child_idx].state = state;
self.updated = true;
};
}

// Forwards ResolverUpdate to all children. This function avoids resharding
// in case you would like to pass resolver errors down to existing children.
pub(crate) fn forward_update_to_children(
&mut self,
channel_controller: &mut dyn ChannelController,
resolver_update: ResolverUpdate,
config: Option<&LbConfig>,
) {
for child_idx in 0..self.children.len() {
let child = &mut self.children[child_idx];
let mut channel_controller = WrappedController::new(channel_controller);
let _ = child.policy.resolver_update(
resolver_update.clone(),
config,
&mut channel_controller,
);
self.resolve_child_controller(channel_controller, child_idx);
}
}

/// Checks whether a child has produced an update.
pub fn has_updated(&mut self) -> bool {
mem::take(&mut self.updated)
}
}

impl<T: PartialEq + Hash + Eq + Send + Sync + 'static> LbPolicy for ChildManager<T> {
Expand Down Expand Up @@ -306,8 +375,21 @@
}
}

fn exit_idle(&mut self, _channel_controller: &mut dyn ChannelController) {
todo!("implement exit_idle")
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
let has_idle = self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this pre-processing step is actually going to be less performant than simply removing it.

You can just loop through the children and exit_idle any of the ones whose state is currently idle.

(With this in place you will have to loop over all elements of the vector twice, potentially, and without it you will do exactly one full pass no matter what.)

.children
.iter()
.any(|child| child.state.connectivity_state == ConnectivityState::Idle);

if !has_idle {
return;
}
for child_idx in 0..self.children.len() {
let child = &mut self.children[child_idx];
let mut channel_controller = WrappedController::new(channel_controller);
child.policy.exit_idle(&mut channel_controller);
self.resolve_child_controller(channel_controller, child_idx);
}
}
}

Expand Down Expand Up @@ -359,61 +441,20 @@

#[cfg(test)]
mod test {
use crate::client::load_balancing::child_manager::{
Child, ChildManager, ChildUpdate, ChildWorkScheduler, ResolverUpdateSharder,
};
use crate::client::load_balancing::child_manager::{ChildManager, EndpointSharder};
use crate::client::load_balancing::test_utils::{
self, StubPolicy, StubPolicyFuncs, TestChannelController, TestEvent, TestSubchannel,
TestWorkScheduler,
self, StubPolicyData, StubPolicyFuncs, TestChannelController, TestEvent,
};
use crate::client::load_balancing::{
ChannelController, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, ParsedJsonLbConfig,
Pick, PickResult, Picker, QueuingPicker, Subchannel, SubchannelState, GLOBAL_LB_REGISTRY,
ChannelController, LbPolicy, LbPolicyBuilder, LbState, QueuingPicker, Subchannel,
SubchannelState, GLOBAL_LB_REGISTRY,
};
use crate::client::name_resolution::{Address, Endpoint, ResolverUpdate};
use crate::client::service_config::{LbConfig, ServiceConfig};
use crate::client::ConnectivityState;
use crate::rt::{default_runtime, Runtime};
use crate::service::Request;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::error::Error;
use crate::rt::default_runtime;
use std::panic;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
use tonic::metadata::MetadataMap;

// TODO: This needs to be moved to a common place that can be shared between
// round_robin and this test. This EndpointSharder maps endpoints to
// children policies.
struct EndpointSharder {
builder: Arc<dyn LbPolicyBuilder>,
}

impl ResolverUpdateSharder<Endpoint> for EndpointSharder {
fn shard_update(
&self,
resolver_update: ResolverUpdate,
) -> Result<Box<dyn Iterator<Item = ChildUpdate<Endpoint>>>, Box<dyn Error + Send + Sync>>
{
let mut sharded_endpoints = Vec::new();
for endpoint in resolver_update.endpoints.unwrap().iter() {
let child_update = ChildUpdate {
child_identifier: endpoint.clone(),
child_policy_builder: self.builder.clone(),
child_update: ResolverUpdate {
attributes: resolver_update.attributes.clone(),
endpoints: Ok(vec![endpoint.clone()]),
service_config: resolver_update.service_config.clone(),
resolution_note: resolver_update.resolution_note.clone(),
},
};
sharded_endpoints.push(child_update);
}
Ok(Box::new(sharded_endpoints.into_iter()))
}
}

// Sets up the test environment.
//
Expand Down Expand Up @@ -444,7 +485,7 @@
let (tx_events, rx_events) = mpsc::unbounded_channel::<TestEvent>();
let tcc = Box::new(TestChannelController { tx_events });
let builder: Arc<dyn LbPolicyBuilder> = GLOBAL_LB_REGISTRY.get_policy(test_name).unwrap();
let endpoint_sharder = EndpointSharder { builder: builder };
let endpoint_sharder = EndpointSharder::new(builder);
let child_manager = ChildManager::new(Box::new(endpoint_sharder), default_runtime());
(rx_events, Box::new(child_manager), tcc)
}
Expand Down Expand Up @@ -517,25 +558,26 @@
// Defines the functions resolver_update and subchannel_update to test
// aggregate_states.
fn create_verifying_funcs_for_aggregate_tests() -> StubPolicyFuncs {
let _data = StubPolicyData::default();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you delete this since it seems to be unused?

StubPolicyFuncs {
// Closure for resolver_update. resolver_update should only receive
// one endpoint and create one subchannel for the endpoint it
// receives.
resolver_update: Some(move |update: ResolverUpdate, _, controller| {
resolver_update: Some(move |_data, update: ResolverUpdate, _, controller| {
assert_eq!(update.endpoints.iter().len(), 1);
let endpoint = update.endpoints.unwrap().pop().unwrap();
let subchannel = controller.new_subchannel(&endpoint.addresses[0]);
let _ = controller.new_subchannel(&endpoint.addresses[0]);
Ok(())
}),
// Closure for subchannel_update. Sends a picker of the same state
// that was passed to it.
subchannel_update: Some(move |updated_subchannel, state, controller| {
subchannel_update: Some(move |_data, _updated_subchannel, state, controller| {
controller.update_picker(LbState {
connectivity_state: state.connectivity_state,
picker: Arc::new(QueuingPicker {}),
});
}),
..Default::default()

Check warning on line 580 in grpc/src/client/load_balancing/child_manager.rs

View workflow job for this annotation

GitHub Actions / clippy

struct update has no effect, all the fields in the struct have already been specified
}
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/client/load_balancing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::client::{

pub mod child_manager;
pub mod pick_first;
pub mod round_robin;
#[cfg(test)]
pub mod test_utils;

Expand Down
Loading
Loading