From 3ccfb4a833fb6340e4f1775a011550de262e9ade Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Wed, 24 Jul 2024 16:48:53 +0200 Subject: [PATCH] add a warmup time option to discovery (#465) When there are a lot more targets than the number of connections in the pool, then it's possible that if the list of hosts changes, the builder might pick a totally new set of hosts than the previously selected ones, none of which will have established subconns. Instead of giving this new list to the picker immediately, first combine it with the list of hosts that were previously selected, so that those subconns have some time to warm up while the current set is still in the list. --- go/vt/vtgateproxy/discovery.go | 37 ++++++++++++++++++++++++-------- go/vt/vtgateproxy/vtgateproxy.go | 1 + 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 7a019e49030..7a5df534e96 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -62,9 +62,10 @@ const PoolTypeAttr = "PoolType" // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). type JSONGateResolver struct { - target resolver.Target - clientConn resolver.ClientConn - poolType string + target resolver.Target + clientConn resolver.ClientConn + poolType string + currentAddrs []resolver.Address } func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} @@ -210,15 +211,24 @@ func (b *JSONGateResolverBuilder) start() error { } parseCount.Add("changed", 1) + var wg sync.WaitGroup + + // notify all the resolvers that the targets changed in parallel, since each update might sleep for + // the warmup time b.mu.RLock() - // notify all the resolvers that the targets changed for _, r := range b.resolvers { - err = b.update(r) - if err != nil { - log.Errorf("Failed to update resolver: %v", err) - } + wg.Add(1) + go func(r *JSONGateResolver) { + defer wg.Done() + + err = b.update(r) + if err != nil { + log.Errorf("Failed to update resolver: %v", err) + } + }(r) } b.mu.RUnlock() + wg.Wait() } }() @@ -393,8 +403,17 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error { addrs = append(addrs, resolver.Address{Addr: target.Addr, Attributes: attributes.New(PoolTypeAttr, r.poolType)}) } - log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) + // If we've already selected some targets, give the new addresses some time to warm up before removing + // the old ones from the list + if r.currentAddrs != nil && warmupTime.Seconds() > 0 { + combined := append(r.currentAddrs, addrs...) + log.V(100).Infof("updating targets for %s to warmup %v", r.target.URL.String(), targets) + r.clientConn.UpdateState(resolver.State{Addresses: combined}) + time.Sleep(*warmupTime) + } + log.V(100).Infof("updating targets for %s after warmup to %v", r.target.URL.String(), targets) + r.currentAddrs = addrs return r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index e9488b022c8..8de456781f3 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -59,6 +59,7 @@ var ( addressField = flag.String("address_field", "address", "field name in the json file containing the address") portField = flag.String("port_field", "port", "field name in the json file containing the port") balancerType = flag.String("balancer", "round_robin", "load balancing algorithm to use") + warmupTime = flag.Duration("warmup_time", 30*time.Second, "time to maintain connections to previously selected hosts") timings = stats.NewTimings("Timings", "proxy timings by operation", "operation")