Skip to content

Commit 14a255d

Browse files
committed
request: init request package
That package is used to schedule random request and file that request to apiserver. Signed-off-by: Wei Fu <weifu@microsoft.com>
1 parent 7b45af5 commit 14a255d

File tree

4 files changed

+357
-6
lines changed

4 files changed

+357
-6
lines changed

random.go

Lines changed: 0 additions & 6 deletions
This file was deleted.

request/client.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package request
2+
3+
import (
4+
"math"
5+
6+
"k8s.io/client-go/rest"
7+
"k8s.io/client-go/tools/clientcmd"
8+
"k8s.io/kubectl/pkg/scheme"
9+
)
10+
11+
// NewClients creates N rest.Interface.
12+
//
13+
// FIXME(weifu):
14+
//
15+
// 1. Is it possible to build one http2 client with multiple connections?
16+
// 2. How to monitor HTTP2 GOAWAY frame?
17+
// 3. Support Protobuf as accepted content
18+
func NewClients(kubeCfgPath string, num int, userAgent string, qps int) ([]rest.Interface, error) {
19+
restCfg, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath)
20+
if err != nil {
21+
return nil, err
22+
}
23+
24+
if qps == 0 {
25+
qps = math.MaxInt32
26+
}
27+
restCfg.QPS = float32(qps)
28+
restCfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
29+
30+
restCfg.UserAgent = userAgent
31+
if restCfg.UserAgent == "" {
32+
restCfg.UserAgent = rest.DefaultKubernetesUserAgent()
33+
}
34+
35+
restClients := make([]rest.Interface, 0, num)
36+
for i := 0; i < num; i++ {
37+
cfgShallowCopy := *restCfg
38+
39+
restCli, err := rest.UnversionedRESTClientFor(&cfgShallowCopy)
40+
if err != nil {
41+
return nil, err
42+
}
43+
restClients = append(restClients, restCli)
44+
}
45+
return restClients, nil
46+
}

request/random.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package request
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"fmt"
7+
"math/big"
8+
"sync"
9+
10+
"github.com/Azure/kperf/api/types"
11+
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/runtime/schema"
14+
"k8s.io/client-go/kubernetes/scheme"
15+
"k8s.io/client-go/rest"
16+
)
17+
18+
// WeightedRandomRequests is used to generate requests based on LoadProfileSpec.
19+
type WeightedRandomRequests struct {
20+
once sync.Once
21+
wg sync.WaitGroup
22+
ctx context.Context
23+
cancel context.CancelFunc
24+
reqBuilderCh chan RequestBuilder
25+
26+
shares []int
27+
reqBuilders []RequestBuilder
28+
}
29+
30+
// NewWeightedRandomRequests creates new instance of WeightedRandomRequests.
31+
func NewWeightedRandomRequests(spec *types.LoadProfileSpec) (*WeightedRandomRequests, error) {
32+
if err := spec.Validate(); err != nil {
33+
return nil, fmt.Errorf("invalid load profile spec: %v", err)
34+
}
35+
36+
shares := make([]int, 0, len(spec.Requests))
37+
reqBuilders := make([]RequestBuilder, 0, len(spec.Requests))
38+
for _, r := range spec.Requests {
39+
shares = append(shares, r.Shares)
40+
41+
var builder RequestBuilder
42+
switch {
43+
case r.StaleList != nil:
44+
builder = newRequestListBuilder(r.StaleList, "0")
45+
case r.QuorumList != nil:
46+
builder = newRequestListBuilder(r.QuorumList, "")
47+
case r.StaleGet != nil:
48+
builder = newRequestGetBuilder(r.StaleGet, "0")
49+
case r.QuorumGet != nil:
50+
builder = newRequestGetBuilder(r.QuorumGet, "")
51+
default:
52+
return nil, fmt.Errorf("only support get/list")
53+
}
54+
reqBuilders = append(reqBuilders, builder)
55+
}
56+
57+
ctx, cancel := context.WithCancel(context.Background())
58+
return &WeightedRandomRequests{
59+
ctx: ctx,
60+
cancel: cancel,
61+
reqBuilderCh: make(chan RequestBuilder),
62+
shares: shares,
63+
reqBuilders: reqBuilders,
64+
}, nil
65+
}
66+
67+
// Run starts to random pick request.
68+
func (r *WeightedRandomRequests) Run(ctx context.Context, total int) {
69+
defer r.wg.Done()
70+
r.wg.Add(1)
71+
72+
sum := 0
73+
for sum < total {
74+
builder := r.randomPick()
75+
select {
76+
case r.reqBuilderCh <- builder:
77+
sum += 1
78+
case <-r.ctx.Done():
79+
return
80+
case <-ctx.Done():
81+
return
82+
}
83+
}
84+
}
85+
86+
// Chan returns channel to get random request.
87+
func (r *WeightedRandomRequests) Chan() chan RequestBuilder {
88+
return r.reqBuilderCh
89+
}
90+
91+
func (r *WeightedRandomRequests) randomPick() RequestBuilder {
92+
sum := 0
93+
for _, s := range r.shares {
94+
sum += s
95+
}
96+
97+
rndInt, err := rand.Int(rand.Reader, big.NewInt(int64(sum)))
98+
if err != nil {
99+
panic(err)
100+
}
101+
102+
rnd := rndInt.Int64()
103+
for i := range r.shares {
104+
s := int64(r.shares[i])
105+
if rnd < s {
106+
return r.reqBuilders[i]
107+
}
108+
rnd -= s
109+
}
110+
panic("unreachable")
111+
}
112+
113+
// Stop stops request generator.
114+
func (r *WeightedRandomRequests) Stop() {
115+
r.once.Do(func() {
116+
r.cancel()
117+
r.wg.Wait()
118+
close(r.reqBuilderCh)
119+
})
120+
}
121+
122+
// RequestBuilder is used to build rest.Request.
123+
type RequestBuilder interface {
124+
Build(cli rest.Interface) (method string, _ *rest.Request)
125+
}
126+
127+
type requestGetBuilder struct {
128+
version schema.GroupVersion
129+
resource string
130+
namespace string
131+
name string
132+
resourceVersion string
133+
}
134+
135+
func newRequestGetBuilder(src *types.RequestGet, resourceVersion string) *requestGetBuilder {
136+
return &requestGetBuilder{
137+
version: schema.GroupVersion{
138+
Group: src.Group,
139+
Version: src.Version,
140+
},
141+
resource: src.Resource,
142+
namespace: src.Namespace,
143+
name: src.Name,
144+
resourceVersion: resourceVersion,
145+
}
146+
}
147+
148+
// Build implements RequestBuilder.Build.
149+
func (b *requestGetBuilder) Build(cli rest.Interface) (string, *rest.Request) {
150+
// https://kubernetes.io/docs/reference/using-api/#api-groups
151+
apiPath := "apis"
152+
if b.version.Group == "" {
153+
apiPath = "api"
154+
}
155+
156+
comps := make([]string, 2, 5)
157+
comps[0], comps[1] = apiPath, b.version.Version
158+
if b.namespace != "" {
159+
comps = append(comps, "namespaces", b.namespace)
160+
}
161+
comps = append(comps, b.resource, b.name)
162+
163+
return "GET", cli.Get().AbsPath(comps...).
164+
SpecificallyVersionedParams(
165+
&metav1.GetOptions{ResourceVersion: b.resourceVersion},
166+
scheme.ParameterCodec,
167+
schema.GroupVersion{Version: "v1"},
168+
)
169+
}
170+
171+
type requestListBuilder struct {
172+
version schema.GroupVersion
173+
resource string
174+
namespace string
175+
limit int64
176+
labelSelector string
177+
resourceVersion string
178+
}
179+
180+
func newRequestListBuilder(src *types.RequestList, resourceVersion string) *requestListBuilder {
181+
return &requestListBuilder{
182+
version: schema.GroupVersion{
183+
Group: src.Group,
184+
Version: src.Version,
185+
},
186+
resource: src.Resource,
187+
namespace: src.Namespace,
188+
limit: int64(src.Limit),
189+
labelSelector: src.Selector,
190+
resourceVersion: resourceVersion,
191+
}
192+
}
193+
194+
// Build implements RequestBuilder.Build.
195+
func (b *requestListBuilder) Build(cli rest.Interface) (string, *rest.Request) {
196+
// https://kubernetes.io/docs/reference/using-api/#api-groups
197+
apiPath := "apis"
198+
if b.version.Group == "" {
199+
apiPath = "api"
200+
}
201+
202+
comps := make([]string, 2, 5)
203+
comps[0], comps[1] = apiPath, b.version.Version
204+
if b.namespace != "" {
205+
comps = append(comps, "namespaces", b.namespace)
206+
}
207+
comps = append(comps, b.resource)
208+
209+
return "LIST", cli.Get().AbsPath(comps...).
210+
SpecificallyVersionedParams(
211+
&metav1.ListOptions{
212+
LabelSelector: b.labelSelector,
213+
ResourceVersion: b.resourceVersion,
214+
Limit: b.limit,
215+
},
216+
scheme.ParameterCodec,
217+
schema.GroupVersion{Version: "v1"},
218+
)
219+
}

request/schedule.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package request
2+
3+
import (
4+
"context"
5+
"io"
6+
"math"
7+
"sync"
8+
"time"
9+
10+
"github.com/Azure/kperf/api/types"
11+
"github.com/Azure/kperf/metrics"
12+
13+
"golang.org/x/time/rate"
14+
"k8s.io/client-go/rest"
15+
)
16+
17+
const defaultTimeout = 60 * time.Second
18+
19+
// Schedule files requests to apiserver based on LoadProfileSpec.
20+
func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.Interface) (*types.ResponseStats, error) {
21+
ctx, cancel := context.WithCancel(ctx)
22+
23+
rndReqs, err := NewWeightedRandomRequests(spec)
24+
if err != nil {
25+
return nil, err
26+
}
27+
28+
qps := spec.Rate
29+
if qps == 0 {
30+
qps = math.MaxInt32
31+
}
32+
limiter := rate.NewLimiter(rate.Limit(qps), 10)
33+
34+
reqBuilderCh := rndReqs.Chan()
35+
var wg sync.WaitGroup
36+
37+
respMetric := metrics.NewResponseMetric()
38+
for _, cli := range restCli {
39+
cli := cli
40+
wg.Add(1)
41+
go func() {
42+
defer wg.Done()
43+
44+
for builder := range reqBuilderCh {
45+
_, req := builder.Build(cli)
46+
47+
if err := limiter.Wait(ctx); err != nil {
48+
cancel()
49+
return
50+
}
51+
52+
req = req.Timeout(defaultTimeout)
53+
func() {
54+
start := time.Now()
55+
defer func() {
56+
respMetric.ObserveLatency(time.Since(start).Seconds())
57+
}()
58+
59+
respBody, err := req.Stream(context.Background())
60+
if err == nil {
61+
defer respBody.Close()
62+
// NOTE: It's to reduce memory usage because
63+
// we don't need that unmarshal object.
64+
_, err = io.Copy(io.Discard, respBody)
65+
}
66+
if err != nil {
67+
respMetric.ObserveFailure()
68+
}
69+
}()
70+
}
71+
}()
72+
}
73+
74+
start := time.Now()
75+
76+
rndReqs.Run(ctx, spec.Total)
77+
rndReqs.Stop()
78+
wg.Wait()
79+
80+
totalDuration := time.Since(start)
81+
82+
latencies, failures, err := respMetric.Gather()
83+
if err != nil {
84+
return nil, err
85+
}
86+
return &types.ResponseStats{
87+
Total: spec.Total,
88+
Failures: failures,
89+
Duration: totalDuration,
90+
Latencies: latencies,
91+
}, nil
92+
}

0 commit comments

Comments
 (0)