Skip to content

Commit 586137e

Browse files
authored
Merge pull request #6547 from mtrqq/feat/gce-server-side-wait
Migrate GCE client to server-side wait
2 parents 082327b + 8a3b6cc commit 586137e

File tree

4 files changed

+92
-78
lines changed

4 files changed

+92
-78
lines changed

cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go

Lines changed: 53 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ import (
3737
)
3838

3939
const (
40-
defaultOperationWaitTimeout = 20 * time.Second
41-
defaultOperationPollInterval = 100 * time.Millisecond
42-
defaultOperationDeletionPollInterval = 1 * time.Second
40+
defaultOperationWaitTimeout = 20 * time.Second
41+
defaultOperationPollInterval = 100 * time.Millisecond
4342
// ErrorCodeQuotaExceeded is an error code used in InstanceErrorInfo if quota exceeded error occurs.
4443
ErrorCodeQuotaExceeded = "QUOTA_EXCEEDED"
4544

@@ -116,6 +115,11 @@ type AutoscalingGceClient interface {
116115
ResizeMig(GceRef, int64) error
117116
DeleteInstances(migRef GceRef, instances []GceRef) error
118117
CreateInstances(GceRef, string, int64, []string) error
118+
119+
// WaitForOperation can be used to poll GCE operations until completion/timeout using WAIT calls.
120+
// Calling this is normally not needed when interacting with the client, other methods should call it internally.
121+
// Can be used to extend the interface with more methods outside of this package.
122+
WaitForOperation(operationName, operationType, project, zone string) error
119123
}
120124

121125
type autoscalingGceClientV1 struct {
@@ -124,39 +128,36 @@ type autoscalingGceClientV1 struct {
124128
projectId string
125129
domainUrl string
126130

127-
// These can be overridden, e.g. for testing.
128-
operationWaitTimeout time.Duration
129-
operationPollInterval time.Duration
130-
operationDeletionPollInterval time.Duration
131+
// Can be overridden, e.g. for testing.
132+
operationWaitTimeout time.Duration
133+
operationPollInterval time.Duration
131134
}
132135

133136
// NewAutoscalingGceClientV1WithTimeout creates a new client with custom timeouts
134137
// for communicating with GCE v1 API
135-
func NewAutoscalingGceClientV1WithTimeout(client *http.Client, projectId string, userAgent string,
136-
waitTimeout, pollInterval, deletionPollInterval time.Duration) (*autoscalingGceClientV1, error) {
138+
func NewAutoscalingGceClientV1WithTimeout(client *http.Client, projectId string, userAgent string, waitTimeout time.Duration, pollInterval time.Duration) (*autoscalingGceClientV1, error) {
137139
gceService, err := gce.New(client)
138140
if err != nil {
139141
return nil, err
140142
}
141143
gceService.UserAgent = userAgent
144+
142145
return &autoscalingGceClientV1{
143-
projectId: projectId,
144-
gceService: gceService,
145-
operationWaitTimeout: waitTimeout,
146-
operationPollInterval: pollInterval,
147-
operationDeletionPollInterval: deletionPollInterval,
146+
projectId: projectId,
147+
gceService: gceService,
148+
operationWaitTimeout: waitTimeout,
149+
operationPollInterval: pollInterval,
148150
}, nil
149151
}
150152

151153
// NewAutoscalingGceClientV1 creates a new client for communicating with GCE v1 API.
152154
func NewAutoscalingGceClientV1(client *http.Client, projectId string, userAgent string) (*autoscalingGceClientV1, error) {
153-
return NewAutoscalingGceClientV1WithTimeout(client, projectId, userAgent, defaultOperationWaitTimeout, defaultOperationPollInterval, defaultOperationDeletionPollInterval)
155+
return NewAutoscalingGceClientV1WithTimeout(client, projectId, userAgent, defaultOperationWaitTimeout, defaultOperationPollInterval)
154156
}
155157

156158
// NewCustomAutoscalingGceClientV1 creates a new client using custom server url and timeouts
157159
// for communicating with GCE v1 API.
158-
func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, userAgent, domainUrl string,
159-
waitTimeout, pollInterval time.Duration, deletionPollInterval time.Duration) (*autoscalingGceClientV1, error) {
160+
func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, userAgent, domainUrl string, waitTimeout time.Duration, pollInterval time.Duration) (*autoscalingGceClientV1, error) {
160161
gceService, err := gce.New(client)
161162
if err != nil {
162163
return nil, err
@@ -165,12 +166,11 @@ func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl,
165166
gceService.UserAgent = userAgent
166167

167168
return &autoscalingGceClientV1{
168-
projectId: projectId,
169-
gceService: gceService,
170-
domainUrl: domainUrl,
171-
operationWaitTimeout: waitTimeout,
172-
operationPollInterval: pollInterval,
173-
operationDeletionPollInterval: deletionPollInterval,
169+
projectId: projectId,
170+
gceService: gceService,
171+
domainUrl: domainUrl,
172+
operationWaitTimeout: waitTimeout,
173+
operationPollInterval: pollInterval,
174174
}, nil
175175
}
176176

@@ -255,7 +255,7 @@ func (client *autoscalingGceClientV1) ResizeMig(migRef GceRef, size int64) error
255255
if err != nil {
256256
return err
257257
}
258-
return client.waitForOp(op, migRef.Project, migRef.Zone, false)
258+
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
259259
}
260260

261261
func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName string, delta int64, existingInstanceProviderIds []string) error {
@@ -272,7 +272,7 @@ func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName st
272272
if err != nil {
273273
return err
274274
}
275-
return client.waitForOp(op, migRef.Project, migRef.Zone, false)
275+
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
276276
}
277277

278278
func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool {
@@ -289,32 +289,37 @@ func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool {
289289
return instanceNames
290290
}
291291

292-
func (client *autoscalingGceClientV1) waitForOp(operation *gce.Operation, project, zone string, isDeletion bool) error {
293-
pollInterval := client.operationPollInterval
294-
if isDeletion {
295-
pollInterval = client.operationDeletionPollInterval
296-
}
297-
for start := time.Now(); time.Since(start) < client.operationWaitTimeout; time.Sleep(pollInterval) {
298-
klog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name)
299-
registerRequest("zone_operations", "get")
300-
if op, err := client.gceService.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil {
301-
klog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status)
302-
if op.Status == "DONE" {
303-
if op.Error != nil {
304-
errBytes, err := op.Error.MarshalJSON()
305-
if err != nil {
306-
errBytes = []byte(fmt.Sprintf("operation failed, but error couldn't be recovered: %v", err))
307-
}
308-
return fmt.Errorf("error while getting operation %s on %s: %s", operation.Name, operation.TargetLink, errBytes)
309-
}
292+
// WaitForOperation can be used to poll GCE operations until completion/timeout using WAIT calls.
293+
// Calling this is normally not needed when interacting with the client, other methods should call it internally.
294+
// Can be used to extend the interface with more methods outside of this package.
295+
func (client *autoscalingGceClientV1) WaitForOperation(operationName, operationType, project, zone string) error {
296+
ctx, cancel := context.WithTimeout(context.Background(), client.operationWaitTimeout)
297+
defer cancel()
298+
299+
for {
300+
klog.V(4).Infof("Waiting for operation %s/%s (%s/%s)", operationType, operationName, project, zone)
301+
registerRequest("zone_operations", "wait")
302+
op, err := client.gceService.ZoneOperations.Wait(project, zone, operationName).Context(ctx).Do()
303+
if err != nil {
304+
return fmt.Errorf("error while waiting for operation %s/%s: %w", operationType, operationName, err)
305+
}
310306

311-
return nil
307+
klog.V(4).Infof("Operation %s/%s (%s/%s) status: %s", operationType, operationName, project, zone, op.Status)
308+
if op.Status == "DONE" {
309+
if op.Error != nil {
310+
errBytes, err := op.Error.MarshalJSON()
311+
if err != nil {
312+
errBytes = []byte(fmt.Sprintf("operation failed, but error couldn't be recovered: %v", err))
313+
}
314+
return fmt.Errorf("error while waiting for operation %s/%s: %s", operationType, operationName, errBytes)
312315
}
313-
} else {
314-
klog.Warningf("Error while getting operation %s on %s: %v", operation.Name, operation.TargetLink, err)
316+
317+
return nil
315318
}
319+
320+
// NOTE: sleep in order not to overload server, as potentially response may be returned immediately
321+
time.Sleep(client.operationPollInterval)
316322
}
317-
return fmt.Errorf("timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink)
318323
}
319324

320325
func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances []GceRef) error {
@@ -330,7 +335,7 @@ func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances [
330335
if err != nil {
331336
return err
332337
}
333-
return client.waitForOp(op, migRef.Project, migRef.Zone, true)
338+
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
334339
}
335340

336341
func (client *autoscalingGceClientV1) FetchMigInstances(migRef GceRef) ([]GceInstance, error) {

cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package gce
1818

1919
import (
20+
"context"
2021
"encoding/json"
2122
"fmt"
2223
"net/http"
@@ -92,15 +93,13 @@ func TestWaitForOp(t *testing.T) {
9293
defer server.Close()
9394
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")
9495

96+
// default polling interval is too big for testing purposes
9597
g.operationPollInterval = 1 * time.Millisecond
96-
g.operationWaitTimeout = 500 * time.Millisecond
9798

98-
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationRunningResponse).Times(3)
99-
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationDoneResponse).Once()
99+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationRunningResponse).Times(3)
100+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationDoneResponse).Once()
100101

101-
operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}
102-
103-
err := g.waitForOp(operation, projectId, zoneB, false)
102+
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestWaitForOp", projectId, zoneB)
104103
assert.NoError(t, err)
105104
mock.AssertExpectationsForObjects(t, server)
106105
}
@@ -110,32 +109,41 @@ func TestWaitForOpError(t *testing.T) {
110109
defer server.Close()
111110
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")
112111

113-
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationDoneResponseError).Once()
114-
115-
operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}
112+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationDoneResponseError).Once()
116113

117-
err := g.waitForOp(operation, projectId, zoneB, false)
114+
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestWaitForOpError", projectId, zoneB)
118115
assert.Error(t, err)
116+
mock.AssertExpectationsForObjects(t, server)
119117
}
120118

121119
func TestWaitForOpTimeout(t *testing.T) {
122120
server := test_util.NewHttpServerMock()
123121
defer server.Close()
124122
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")
125123

126-
// The values here are higher than in other tests since we're aiming for timeout.
127-
// Lower values make this fragile and flakey.
128-
g.operationPollInterval = 10 * time.Millisecond
129-
g.operationWaitTimeout = 49 * time.Millisecond
124+
// default polling interval and wait time are too big for the test
125+
g.operationWaitTimeout = 10 * time.Millisecond
126+
g.operationPollInterval = 20 * time.Millisecond
130127

131-
// Sometimes, only 3 calls are made, but it doesn't really matter,
132-
// so let's not assert expectations for this mock, just check for timeout error.
133-
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationRunningResponse).Times(5)
128+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationRunningResponse).Once()
134129

135-
operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}
136-
137-
err := g.waitForOp(operation, projectId, zoneB, false)
130+
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestWaitForOpTimeout", projectId, zoneB)
138131
assert.Error(t, err)
132+
mock.AssertExpectationsForObjects(t, server)
133+
}
134+
135+
func TestWaitForOpContextTimeout(t *testing.T) {
136+
server := test_util.NewHttpServerMock()
137+
defer server.Close()
138+
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")
139+
140+
g.operationWaitTimeout = 10 * time.Millisecond
141+
142+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").After(time.Minute).Return(operationDoneResponse).Once()
143+
144+
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestWaitForOpContextTimeout", projectId, zoneB)
145+
assert.ErrorIs(t, err, context.DeadlineExceeded)
146+
mock.AssertExpectationsForObjects(t, server)
139147
}
140148

141149
func TestErrors(t *testing.T) {
@@ -553,12 +561,10 @@ func TestUserAgent(t *testing.T) {
553561
defer server.Close()
554562
g := newTestAutoscalingGceClient(t, "project1", server.URL, "testuseragent")
555563

556-
g.operationPollInterval = 10 * time.Millisecond
557-
g.operationWaitTimeout = 49 * time.Millisecond
558-
559-
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return("testuseragent", operationRunningResponse).Maybe()
564+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return("testuseragent", operationDoneResponse).Maybe()
560565

561-
operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"}
566+
err := g.WaitForOperation("operation-1505728466148-d16f5197", "TestUserAgent", projectId, zoneB)
562567

563-
g.waitForOp(operation, projectId, zoneB, false)
568+
assert.NoError(t, err)
569+
mock.AssertExpectationsForObjects(t, server)
564570
}

cluster-autoscaler/cloudprovider/gce/gce_manager_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,6 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa
329329

330330
// Override wait for op timeouts.
331331
gceService.operationWaitTimeout = 50 * time.Millisecond
332-
gceService.operationPollInterval = 1 * time.Millisecond
333332

334333
cache := &GceCache{
335334
migs: make(map[GceRef]Mig),
@@ -475,7 +474,7 @@ func TestDeleteInstances(t *testing.T) {
475474
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Once()
476475

477476
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/deleteInstances").Return(deleteInstancesResponse).Once()
478-
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa").Return(deleteInstancesOperationResponse).Once()
477+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa/wait").Return(deleteInstancesOperationResponse).Once()
479478

480479
instances := []GceRef{
481480
{
@@ -585,7 +584,7 @@ func TestGetAndSetMigSize(t *testing.T) {
585584

586585
// set target size for extraPoolMig; will require resize API call and API call for polling for resize operation
587586
server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%s/resize", extraPoolMigName)).Return(setMigSizeResponse).Once()
588-
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505739408819-5597646964339-eb839c88-28805931").Return(setMigSizeOperationResponse).Once()
587+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505739408819-5597646964339-eb839c88-28805931/wait").Return(setMigSizeOperationResponse).Once()
589588
err = g.SetMigSize(extraPoolMig, 4)
590589
assert.NoError(t, err)
591590
mock.AssertExpectationsForObjects(t, server)
@@ -1511,7 +1510,7 @@ func TestAppendInstances(t *testing.T) {
15111510
defaultPoolMig := setupTestDefaultPool(g, true)
15121511
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Once()
15131512
server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%v/createInstances", defaultPoolMig.gceRef.Name)).Return(createInstancesResponse).Once()
1514-
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32").Return(createInstancesOperationResponse).Once()
1513+
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32/wait").Return(createInstancesOperationResponse).Once()
15151514
err := g.CreateInstances(defaultPoolMig, 2)
15161515
assert.NoError(t, err)
15171516
mock.AssertExpectationsForObjects(t, server)

cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ func (client *mockAutoscalingGceClient) CreateInstances(_ GceRef, _ string, _ in
127127
return nil
128128
}
129129

130+
func (client *mockAutoscalingGceClient) WaitForOperation(_, _, _, _ string) error {
131+
return nil
132+
}
133+
130134
func TestFillMigInstances(t *testing.T) {
131135
migRef := GceRef{Project: "test", Zone: "zone-A", Name: "some-mig"}
132136
oldInstances := []GceInstance{

0 commit comments

Comments
 (0)