@@ -24,16 +24,15 @@ import (
24
24
"time"
25
25
26
26
"github.com/bytedance/gopkg/cloud/metainfo"
27
- "github.com/cloudwego/kitex/pkg/endpoint"
28
- "github.com/cloudwego/kitex/pkg/kerrors"
29
-
30
27
"github.com/cloudwego/kitex-tests/kitex_gen/thrift/stability"
31
28
"github.com/cloudwego/kitex-tests/kitex_gen/thrift/stability/stservice"
32
29
"github.com/cloudwego/kitex-tests/pkg/test"
33
30
"github.com/cloudwego/kitex-tests/thriftrpc"
34
31
"github.com/cloudwego/kitex/client"
35
32
"github.com/cloudwego/kitex/client/callopt"
36
33
"github.com/cloudwego/kitex/pkg/circuitbreak"
34
+ "github.com/cloudwego/kitex/pkg/endpoint"
35
+ "github.com/cloudwego/kitex/pkg/kerrors"
37
36
"github.com/cloudwego/kitex/pkg/remote"
38
37
"github.com/cloudwego/kitex/pkg/retry"
39
38
"github.com/cloudwego/kitex/pkg/rpcinfo"
@@ -79,31 +78,69 @@ func genCBKey(ri rpcinfo.RPCInfo) string {
79
78
}
80
79
81
80
func TestRetryCB (t * testing.T ) {
82
- atomic .StoreInt32 (& testSTReqCount , - 1 )
83
- // retry config
84
- fp := retry .NewFailurePolicy ()
85
-
81
+ reqCount := int32 (0 )
86
82
cli = getKitexClient (
87
- transport .PurePayload ,
88
- client .WithFailureRetry (fp ),
83
+ transport .TTHeader ,
84
+ client .WithRetryContainer (retry .NewRetryContainer ()), // use default circuit breaker
85
+ client .WithFailureRetry (retry .NewFailurePolicy ()),
89
86
client .WithRPCTimeout (20 * time .Millisecond ),
87
+ client .WithMiddleware (func (next endpoint.Endpoint ) endpoint.Endpoint {
88
+ return func (ctx context.Context , req , resp interface {}) (err error ) {
89
+ count := atomic .AddInt32 (& reqCount , 1 )
90
+ if count % 10 == 0 {
91
+ time .Sleep (50 * time .Millisecond )
92
+ }
93
+ return nil // no need to send the real request
94
+ }
95
+ }),
90
96
)
91
97
92
98
ctx , stReq := thriftrpc .CreateSTRequest (context .Background ())
93
99
cbCount := 0
94
100
for i := 0 ; i < 300 ; i ++ {
95
- stResp , err := cli .TestSTReq (ctx , stReq )
96
- if err != nil {
97
- test .Assert (t , strings .Contains (err .Error (), "retry circuit break" ), err , i )
101
+ _ , err := cli .TestSTReq (ctx , stReq )
102
+ if err != nil && strings .Contains (err .Error (), "retry circuit break" ) {
98
103
cbCount ++
99
- } else {
100
- test .Assert (t , err == nil , err , i )
101
- test .Assert (t , stReq .Str == stResp .Str )
102
104
}
103
105
}
106
+ test .Assert (t , reqCount == 300 , reqCount )
104
107
test .Assert (t , cbCount == 30 , cbCount )
105
108
}
106
109
110
+ func TestRetryCBPercentageLimit (t * testing.T ) {
111
+ reqCount := int32 (0 )
112
+ cli := getKitexClient (
113
+ transport .TTHeaderFramed ,
114
+ client .WithRetryContainer (retry .NewRetryContainerWithPercentageLimit ()),
115
+ client .WithFailureRetry (retry .NewFailurePolicy ()), // cb threshold = 10%
116
+ client .WithRPCTimeout (20 * time .Millisecond ),
117
+ client .WithMiddleware (func (next endpoint.Endpoint ) endpoint.Endpoint {
118
+ return func (ctx context.Context , req , resp interface {}) (err error ) {
119
+ atomic .AddInt32 (& reqCount , 1 )
120
+ if tm := getSleepTimeMS (ctx ); tm > 0 {
121
+ time .Sleep (tm )
122
+ }
123
+ return nil // no need to send a real request
124
+ }
125
+ }),
126
+ )
127
+ ctx , stReq := thriftrpc .CreateSTRequest (context .Background ())
128
+ ctx = setSkipCounterSleep (ctx )
129
+ cbCount := 0
130
+ for i := 0 ; i < 300 ; i ++ {
131
+ reqCtx := ctx
132
+ if i % 5 == 0 {
133
+ reqCtx = metainfo .WithPersistentValue (ctx , sleepTimeMsKey , "50" )
134
+ }
135
+ _ , err := cli .TestSTReq (reqCtx , stReq )
136
+ if err != nil && strings .Contains (err .Error (), "retry circuit break" ) {
137
+ cbCount ++
138
+ }
139
+ }
140
+ test .Assert (t , reqCount == 333 , reqCount )
141
+ test .Assert (t , cbCount == 58 , cbCount )
142
+ }
143
+
107
144
func TestNoCB (t * testing.T ) {
108
145
atomic .StoreInt32 (& testSTReqCount , - 1 )
109
146
// retry config
0 commit comments