-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(grpc): add round robin load balancing policy #2405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
delete round robin store changes save changes saving Save changes again lol Save changes add round robin save changes some changes delete store changes save changes saving Save changes again lol Save changes add round robin
18dea86
to
03671e4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Havent looked at the tests yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still have to look at a few more tests.
let lb_policy = lb_policy.as_mut(); | ||
let tcc = tcc.as_mut(); | ||
|
||
let endpoints = create_n_endpoints_with_k_addresses(2, 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the whole point of this test is to ensure that the policy moves to TF on receiving empty addresses, you can simplify this test a little by only sending one endpoint with one address initially, instead of 2 endpoints with 3 addresses each.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok! Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
let subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 1).await; | ||
|
||
let second_subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 1).await; | ||
let mut all_subchannels = subchannels.clone(); | ||
all_subchannels.extend(second_subchannels.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be replaced with:
let subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 2).await
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
let new_picker = verify_roundrobin_ready_picker_from_policy(&mut rx_events).await; | ||
|
||
let req = test_utils::new_request(); | ||
let mut picked = Vec::new(); | ||
for _ in 0..4 { | ||
match new_picker.pick(&req) { | ||
PickResult::Pick(pick) => { | ||
println!("picked subchannel is {}", pick.subchannel); | ||
picked.push(pick.subchannel.clone()) | ||
} | ||
other => panic!("unexpected pick result {}", other), | ||
} | ||
} | ||
|
||
assert_eq!(&picked[0], &picked[2]); | ||
assert_eq!(&picked[1], &picked[3]); | ||
assert!(picked.contains(&subchannels[0])); | ||
assert!(!picked.contains(&subchannel_being_removed)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use the other helper function here instead: verify_ready_picker_from_policy
since you expect the pick to return the single Ready subchannel all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This picker would have two subchannels so it might be easier to do this and make sure that it alternates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
// should not be apart of its picks anymore and should be removed. It should | ||
// then roundrobin across the endpoints it still has and the new one. | ||
#[tokio::test] | ||
async fn roundrobin_pick_after_resolved_updated_hosts() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, the variable names are making the test a little harder to follow. Especially, when you call something removed_xxx
and then you go ahead and add them :)
Instead if you just name then subchannel_1, subchannel_2 and subchannel_3 and state in the comments that we start off with subchannels 1 & 2, and then get an update from the resolver that removes subchannel 1, but adds subchannel 3, I think that would make it easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
let subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 1).await; | ||
let second_subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 1).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well, can't we just wait for two subchannels at have them in a vector instead of two different variables, which are again vectors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
let lb_policy = lb_policy.as_mut(); | ||
let tcc = tcc.as_mut(); | ||
|
||
let endpoint = create_endpoint_with_n_addresses(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just have one address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
// once the connection succeeds, move to READY state with a picker that returns | ||
// that subchannel. | ||
#[tokio::test] | ||
async fn roundrobin_with_one_backend() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be very similar to roundrobin_simple_test
where you create a single endpoint with two addresses, but here a single endpoint with a single address. I think they both are testing the same scenario. Correct me if I'm wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
// to connect to them in order, until a connection succeeds, at which point it | ||
// should move to READY state with a picker that returns that subchannel. | ||
#[tokio::test] | ||
async fn roundrobin_with_multiple_backends_first_backend_is_ready() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with this test. I don't see what exactly this is testing that is different from the above test and the simple test. Also, we are not actually verifying that the LB policy is attempting to connect. GIven that the functionality to start connecting is a pick_first functionality, we ideally shouldn't be testing that here (or if we want to test that, we should be using the real pick_first. Even then, if we want to test that, an e2e test would be better).
So, let me know what scenario this one is testing that is different from the above mentioned tests. THanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connecting isn't tested here. I can combine all of these into one test to make things simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up keeping only roundrobin_with_multiple_backends_first_backend_is_ready
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
send_resolver_update_to_policy(lb_policy, vec![endpoints.clone()], tcc); | ||
let subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 4).await; | ||
lb_policy.subchannel_update(subchannels[0].clone(), &SubchannelState::default(), tcc); | ||
verify_connecting_picker_from_policy(&mut rx_events).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is interesting that we have to move to Connecting here given that the update contains the previously connected address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of this is based on Pick First functionality, which I believe isn't included in my stub balancer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a TODO that we wouldn't expect this state transition once we start using the actual pick_first LB policy?
self.last_resolver_error = | ||
Some("received no endpoints from the name resolver".to_string()); | ||
self.move_to_transient_failure(channel_controller); | ||
return Err("received no endpoints from the name resolver".into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to call into the child manager at some point here. Otherwise if we had endpoints before then they'll stick around.
I was expecting RR's methods to all immediately call into child_manager
, and then update the picker if the child_manager reported any changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that the Child Manager handles error cases like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having zero children shouldn't be an error case, should it? Is something breaking if you do this?
|
||
fn move_children_from_idle(&mut self, channel_controller: &mut dyn ChannelController) { | ||
let mut should_exit_idle = false; | ||
for (_, state) in self.child_manager.child_states() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use child_states().any()
instead?
Or skip it and always call exit_idle
and put any filtering in child_manager
instead is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting it in child_manager makes more sense! I will do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added filtering logic in child manager where it calls exit_idle on all children if at least one child is Idle. Let me know if you think it's better to call exit_idle on all idle children.
let result = self | ||
.child_manager | ||
.resolver_update(update, config, channel_controller); | ||
self.move_children_from_idle(channel_controller); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be done inside has_updated
(everywhere). If children haven't updated, there's no way their states could have changed to become idle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Will fix!
self.move_children_from_idle(channel_controller); | ||
if self.child_manager.has_updated() { | ||
self.send_aggregate_picker(channel_controller); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually since you follow this exact pattern many times, it would be better to put it in a function:
fn resolve_child_updates(&mut self, channel_controller: &mut dyn ChannelController) {
if !self.child_manager.has_updated() {
return;
}
self.move_from_idle(channel_controller);
self.send_aggregate_picker(channel_controller);
// or just do the above things inline; that's probable fine too...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -234,7 +244,7 @@ impl LbPolicyBuilder for StubPolicyBuilder { | |||
&self, | |||
_config: &ParsedJsonLbConfig, | |||
) -> Result<Option<LbConfig>, Box<dyn Error + Send + Sync>> { | |||
todo!("Implement parse_config in StubPolicyBuilder") | |||
todo!("Implement parse_config in StubPolicy"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm ... do we need this change, given that parse_config
is a method on the builder?
let lb_policy = lb_policy.as_mut(); | ||
let tcc = tcc.as_mut(); | ||
|
||
let endpoints = create_n_endpoints_with_k_addresses(2, 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
let subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 1).await; | ||
|
||
let second_subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 1).await; | ||
let mut all_subchannels = subchannels.clone(); | ||
all_subchannels.extend(second_subchannels.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
let new_picker = verify_roundrobin_ready_picker_from_policy(&mut rx_events).await; | ||
|
||
let req = test_utils::new_request(); | ||
let mut picked = Vec::new(); | ||
for _ in 0..4 { | ||
match new_picker.pick(&req) { | ||
PickResult::Pick(pick) => { | ||
println!("picked subchannel is {}", pick.subchannel); | ||
picked.push(pick.subchannel.clone()) | ||
} | ||
other => panic!("unexpected pick result {}", other), | ||
} | ||
} | ||
|
||
assert_eq!(&picked[0], &picked[2]); | ||
assert_eq!(&picked[1], &picked[3]); | ||
assert!(picked.contains(&subchannels[0])); | ||
assert!(!picked.contains(&subchannel_being_removed)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
// should not be apart of its picks anymore and should be removed. It should | ||
// then roundrobin across the endpoints it still has and the new one. | ||
#[tokio::test] | ||
async fn roundrobin_pick_after_resolved_updated_hosts() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
let subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 1).await; | ||
let second_subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 1).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
let lb_policy = lb_policy.as_mut(); | ||
let tcc = tcc.as_mut(); | ||
|
||
let endpoint = create_endpoint_with_n_addresses(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
// once the connection succeeds, move to READY state with a picker that returns | ||
// that subchannel. | ||
#[tokio::test] | ||
async fn roundrobin_with_one_backend() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
// to connect to them in order, until a connection succeeds, at which point it | ||
// should move to READY state with a picker that returns that subchannel. | ||
#[tokio::test] | ||
async fn roundrobin_with_multiple_backends_first_backend_is_ready() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
send_resolver_update_to_policy(lb_policy, vec![endpoints.clone()], tcc); | ||
let subchannels = verify_subchannel_creation_from_policy(&mut rx_events, 4).await; | ||
lb_policy.subchannel_update(subchannels[0].clone(), &SubchannelState::default(), tcc); | ||
verify_connecting_picker_from_policy(&mut rx_events).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a TODO that we wouldn't expect this state transition once we start using the actual pick_first LB policy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems I wrote some comments awhile back and didn't send them. I'm pretty sure I didn't get all the way through, sorry, but I'll send these for now anyway.
@@ -386,6 +376,14 @@ impl<T: PartialEq + Hash + Eq + Send + Sync + 'static> LbPolicy for ChildManager | |||
} | |||
|
|||
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) { | |||
let has_idle = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure this pre-processing step is actually going to be less performant than simply removing it.
You can just loop through the children and exit_idle
any of the ones whose state is currently idle.
(With this in place you will have to loop over all elements of the vector twice, potentially, and without it you will do exactly one full pass no matter what.)
@@ -571,20 +558,20 @@ mod test { | |||
// Defines the functions resolver_update and subchannel_update to test | |||
// aggregate_states. | |||
fn create_verifying_funcs_for_aggregate_tests() -> StubPolicyFuncs { | |||
let data = StubPolicyData::default(); | |||
let _data = StubPolicyData::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you delete this since it seems to be unused?
self.last_resolver_error = | ||
Some("received no endpoints from the name resolver".to_string()); | ||
self.move_to_transient_failure(channel_controller); | ||
return Err("received no endpoints from the name resolver".into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having zero children shouldn't be an error case, should it? Is something breaking if you do this?
Add round robin load balancing policy
@dfawley @easwars