@@ -32,10 +32,11 @@ limitations under the License.
32
32
33
33
import (
34
34
"errors"
35
- "sync"
35
+ "sync/atomic "
36
36
37
37
"google.golang.org/grpc/balancer"
38
38
"google.golang.org/grpc/balancer/base"
39
+ "google.golang.org/grpc/resolver"
39
40
40
41
"vitess.io/vitess/go/vt/log"
41
42
)
@@ -54,8 +55,16 @@ func init() {
54
55
// Once a conn is chosen and is in the ready state, it will remain as the
55
56
// active subconn even if other connections become available.
56
57
type frPickerBuilder struct {
57
- mu sync.Mutex
58
- currentConn balancer.SubConn
58
+ }
59
+
60
+ type frConnWithAddr struct {
61
+ subConn balancer.SubConn
62
+ addr resolver.Address
63
+ }
64
+
65
+ type frPicker struct {
66
+ subConns []frConnWithAddr
67
+ cur uint32
59
68
}
60
69
61
70
func (f * frPickerBuilder ) Build (info base.PickerBuildInfo ) balancer.Picker {
@@ -65,36 +74,24 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
65
74
return base .NewErrPicker (errors .New ("no available connections" ))
66
75
}
67
76
68
- f .mu .Lock ()
69
- defer f .mu .Unlock ()
70
-
71
- // If we've already chosen a subconn, and it is still in the ready list, then
72
- // no need to change state
73
- if f .currentConn != nil {
74
- log .V (100 ).Infof ("first_ready: currentConn is active, checking if still ready" )
75
- for sc := range info .ReadySCs {
76
- if f .currentConn == sc {
77
- log .V (100 ).Infof ("first_ready: currentConn still active - not changing" )
78
- return f
79
- }
80
- }
77
+ subConns := make ([]frConnWithAddr , len (info .ReadySCs ))
78
+ idx := 0
79
+ for sc , k := range info .ReadySCs {
80
+ subConns [idx ] = frConnWithAddr {subConn : sc , addr : k .Address }
81
+ idx ++
81
82
}
82
83
83
- // Otherwise either we don't have an active conn or the conn we were using is
84
- // no longer active, so pick an arbitrary new one out of the map.
85
- log .V (100 ).Infof ("first_ready: currentConn is not active, picking a new one" )
86
- for sc := range info .ReadySCs {
87
- f .currentConn = sc
88
- break
89
- }
90
-
91
- return f
84
+ return & frPicker {subConns : subConns , cur : 0 }
92
85
}
93
86
94
87
// Pick simply returns the currently chosen conn
95
- func (f * frPickerBuilder ) Pick (balancer.PickInfo ) (balancer.PickResult , error ) {
96
- f .mu .Lock ()
97
- defer f .mu .Unlock ()
98
-
99
- return balancer.PickResult {SubConn : f .currentConn }, nil
88
+ func (f * frPicker ) Pick (pi balancer.PickInfo ) (balancer.PickResult , error ) {
89
+ curIndex := atomic .LoadUint32 (& f .cur )
90
+ return balancer.PickResult {SubConn : f .subConns [curIndex ].subConn , Done : func (info balancer.DoneInfo ) {
91
+ if info .Err != nil {
92
+ // Only try to move the index at most 1 - if someone else raced and advanced it, do nothing.
93
+ nextIndex := (curIndex + 1 ) % uint32 (len (f .subConns ))
94
+ atomic .CompareAndSwapUint32 (& f .cur , curIndex , nextIndex )
95
+ }
96
+ }}, nil
100
97
}
0 commit comments