From 32cbbf5308b3c18ab2eafc53923db158f3490558 Mon Sep 17 00:00:00 2001 From: Qi Ni Date: Thu, 23 Dec 2021 15:55:30 +0800 Subject: [PATCH] feat: support sending sync requests concurrently in batches when putting vmss vm (cherry picked from commit d9edcd6ec1429d8995d985b1ebddbce77bfd25ad) --- pkg/azureclients/armclient/azure_armclient.go | 142 ++++++++++++------ .../armclient/azure_armclient_test.go | 104 +++++++++++-- pkg/azureclients/armclient/interface.go | 3 + .../armclient/mockarmclient/interface.go | 115 +++++++------- .../vmssvmclient/azure_vmssvmclient.go | 10 +- .../vmssvmclient/azure_vmssvmclient_test.go | 16 +- pkg/azureclients/vmssvmclient/interface.go | 2 +- .../mockvmssvmclient/interface.go | 43 +++--- pkg/provider/azure.go | 7 + pkg/provider/azure_vmss.go | 4 +- pkg/provider/azure_vmss_test.go | 6 +- 11 files changed, 299 insertions(+), 153 deletions(-) diff --git a/pkg/azureclients/armclient/azure_armclient.go b/pkg/azureclients/armclient/azure_armclient.go index f65e6721c6..350128667c 100644 --- a/pkg/azureclients/armclient/azure_armclient.go +++ b/pkg/azureclients/armclient/azure_armclient.go @@ -416,46 +416,7 @@ func (c *Client) PutResource(ctx context.Context, resourceID string, parameters return c.PutResourceWithDecorators(ctx, resourceID, parameters, putDecorators) } -// PutResources puts a list of resources from resources map[resourceID]parameters. -// Those resources sync requests are sequential while async requests are concurrent. It's especially -// useful when the ARM API doesn't support concurrent requests. -func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse { - if len(resources) == 0 { - return nil - } - - // Sequential sync requests. - futures := make(map[string]*azure.Future) - responses := make(map[string]*PutResourcesResponse) - for resourceID, parameters := range resources { - decorators := []autorest.PrepareDecorator{ - autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}), - autorest.WithJSON(parameters), - } - request, err := c.PreparePutRequest(ctx, decorators...) - dumpRequest(request, 10) - if err != nil { - klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.prepare", resourceID, err) - responses[resourceID] = &PutResourcesResponse{ - Error: retry.NewError(false, err), - } - continue - } - - future, resp, clientErr := c.SendAsync(ctx, request) - defer c.CloseResponse(ctx, resp) - if clientErr != nil { - klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.send", resourceID, clientErr.Error()) - responses[resourceID] = &PutResourcesResponse{ - Error: clientErr, - } - continue - } - - futures[resourceID] = future - } - - // Concurrent async requests. +func (c *Client) waitAsync(ctx context.Context, futures map[string]*azure.Future, previousResponses map[string]*PutResourcesResponse) { wg := sync.WaitGroup{} var responseLock sync.Mutex for resourceID, future := range futures { @@ -478,22 +439,111 @@ func (c *Client) PutResources(ctx context.Context, resources map[string]interfac } responseLock.Lock() - responses[resourceID] = &PutResourcesResponse{ + previousResponses[resourceID] = &PutResourcesResponse{ Error: retriableErr, } responseLock.Unlock() return } + }(resourceID, future) + } + wg.Wait() +} + +// PutResources puts a list of resources from resources map[resourceID]parameters. +// Those resources sync requests are sequential while async requests are concurrent. It's especially +// useful when the ARM API doesn't support concurrent requests. +func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse { + if len(resources) == 0 { + return nil + } - responseLock.Lock() + // Sequential sync requests. + futures := make(map[string]*azure.Future) + responses := make(map[string]*PutResourcesResponse) + for resourceID, parameters := range resources { + future, rerr := c.PutResourceAsync(ctx, resourceID, parameters) + if rerr != nil { responses[resourceID] = &PutResourcesResponse{ - Response: response, + Error: rerr, } - responseLock.Unlock() - }(resourceID, future) + continue + } + futures[resourceID] = future } - wg.Wait() + c.waitAsync(ctx, futures, responses) + + return responses +} + +// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches. +func (c *Client) PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse { + if len(resources) == 0 { + return nil + } + + if batchSize <= 0 { + klog.V(4).Infof("PutResourcesInBatches: batch size %d, put resources in sequence", batchSize) + return c.PutResources(ctx, resources) + } + + if batchSize > len(resources) { + klog.V(4).Infof("PutResourcesInBatches: batch size %d, but the number of the resources is %d", batchSize, resources) + batchSize = len(resources) + } + klog.V(4).Infof("PutResourcesInBatches: send sync requests in parallel with the batch size %d", batchSize) + + // Convert map to slice because it is more straightforward to + // loop over slice in batches than map. + type resourcesMeta struct { + resourceID string + parameters interface{} + } + resourcesList := make([]resourcesMeta, 0) + for resourceID, parameters := range resources { + resourcesList = append(resourcesList, resourcesMeta{ + resourceID: resourceID, + parameters: parameters, + }) + } + + // Concurrent sync requests in batches. + futures := make(map[string]*azure.Future) + responses := make(map[string]*PutResourcesResponse) + wg := sync.WaitGroup{} + var responseLock, futuresLock sync.Mutex + for i := 0; i < len(resourcesList); i += batchSize { + j := i + batchSize + if j > len(resourcesList) { + j = len(resourcesList) + } + + for k := i; k < j; k++ { + wg.Add(1) + go func(resourceID string, parameters interface{}) { + defer wg.Done() + future, rerr := c.PutResourceAsync(ctx, resourceID, parameters) + if rerr != nil { + responseLock.Lock() + responses[resourceID] = &PutResourcesResponse{ + Error: rerr, + } + responseLock.Unlock() + return + } + + futuresLock.Lock() + futures[resourceID] = future + futuresLock.Unlock() + }(resourcesList[k].resourceID, resourcesList[k].parameters) + } + wg.Wait() + } + + // Concurrent async requests. + c.waitAsync(ctx, futures, responses) + return responses } diff --git a/pkg/azureclients/armclient/azure_armclient_test.go b/pkg/azureclients/armclient/azure_armclient_test.go index 810dfb3773..06e7860119 100644 --- a/pkg/azureclients/armclient/azure_armclient_test.go +++ b/pkg/azureclients/armclient/azure_armclient_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/http/cookiejar" "net/http/httptest" + "sync" "testing" "time" @@ -343,6 +344,26 @@ func TestPutResource(t *testing.T) { } func TestPutResources(t *testing.T) { + total := 0 + server := getTestServer(t, &total) + + azConfig := azureclients.ClientConfig{Backoff: &retry.Backoff{Steps: 1}, UserAgent: "test", Location: "eastus"} + armClient := New(nil, azConfig, server.URL, "2019-01-01") + armClient.client.RetryDuration = time.Millisecond * 1 + + ctx := context.Background() + resources := map[string]interface{}{ + "/id/1": nil, + "/id/2": nil, + } + responses := armClient.PutResources(ctx, nil) + assert.Nil(t, responses) + responses = armClient.PutResources(ctx, resources) + assert.NotNil(t, responses) + assert.Equal(t, 3, total) +} + +func getTestServer(t *testing.T, counter *int) *httptest.Server { serverFuncs := []func(rw http.ResponseWriter, req *http.Request){ func(rw http.ResponseWriter, req *http.Request) { assert.Equal(t, "PUT", req.Method) @@ -372,30 +393,81 @@ func TestPutResources(t *testing.T) { }, } - i, total := 0, 0 - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + i := 0 + var l sync.Mutex + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + l.Lock() serverFuncs[i](w, r) i++ if i > 3 { i = 3 } - total++ + *counter++ + l.Unlock() })) +} - azConfig := azureclients.ClientConfig{Backoff: &retry.Backoff{Steps: 1}, UserAgent: "test", Location: "eastus"} - armClient := New(nil, azConfig, server.URL, "2019-01-01") - armClient.client.RetryDuration = time.Millisecond * 1 - - ctx := context.Background() - resources := map[string]interface{}{ - "/id/1": nil, - "/id/2": nil, +func TestPutResourcesInBatches(t *testing.T) { + for _, testCase := range []struct { + description string + resources map[string]interface{} + batchSize, expectedCallTimes int + }{ + { + description: "", + resources: map[string]interface{}{ + "/id/1": nil, + "/id/2": nil, + }, + batchSize: 2, + expectedCallTimes: 3, + }, + { + description: "", + resources: map[string]interface{}{ + "/id/1": nil, + "/id/2": nil, + }, + batchSize: 1, + expectedCallTimes: 3, + }, + { + description: "", + resources: nil, + }, + { + description: "PutResourcesInBatches should set the batch size to the length of the resources if the batch size is larger than it", + resources: map[string]interface{}{ + "/id/1": nil, + "/id/2": nil, + }, + batchSize: 10, + expectedCallTimes: 3, + }, + { + description: "PutResourcesInBatches should call PutResources if the batch size is smaller than or equal to zero", + resources: map[string]interface{}{ + "/id/1": nil, + "/id/2": nil, + }, + expectedCallTimes: 3, + }, + } { + t.Run(testCase.description, func(t *testing.T) { + total := 0 + server := getTestServer(t, &total) + + azConfig := azureclients.ClientConfig{Backoff: &retry.Backoff{Steps: 1}, UserAgent: "test", Location: "eastus"} + armClient := New(nil, azConfig, server.URL, "2019-01-01") + armClient.client.RetryDuration = time.Millisecond * 1 + + ctx := context.Background() + responses := armClient.PutResourcesInBatches(ctx, testCase.resources, testCase.batchSize) + assert.Equal(t, testCase.resources == nil, responses == nil) + assert.Equal(t, testCase.expectedCallTimes, total) + }) } - responses := armClient.PutResources(ctx, nil) - assert.Nil(t, responses) - responses = armClient.PutResources(ctx, resources) - assert.NotNil(t, responses) - assert.Equal(t, 3, total) + } func TestPutResourceAsync(t *testing.T) { diff --git a/pkg/azureclients/armclient/interface.go b/pkg/azureclients/armclient/interface.go index 20a36bb9e3..0970f3e2e0 100644 --- a/pkg/azureclients/armclient/interface.go +++ b/pkg/azureclients/armclient/interface.go @@ -70,6 +70,9 @@ type Interface interface { // useful when the ARM API doesn't support concurrent requests. PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse + // PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches. + PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse + // PutResourceWithDecorators puts a resource with decorators by resource ID PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) diff --git a/pkg/azureclients/armclient/mockarmclient/interface.go b/pkg/azureclients/armclient/mockarmclient/interface.go index c5f0c084a9..279086e8c6 100644 --- a/pkg/azureclients/armclient/mockarmclient/interface.go +++ b/pkg/azureclients/armclient/mockarmclient/interface.go @@ -23,40 +23,39 @@ package mockarmclient import ( context "context" - http "net/http" - reflect "reflect" - autorest "github.com/Azure/go-autorest/autorest" azure "github.com/Azure/go-autorest/autorest/azure" gomock "github.com/golang/mock/gomock" + http "net/http" + reflect "reflect" armclient "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/armclient" retry "sigs.k8s.io/cloud-provider-azure/pkg/retry" ) -// MockInterface is a mock of Interface interface. +// MockInterface is a mock of Interface interface type MockInterface struct { ctrl *gomock.Controller recorder *MockInterfaceMockRecorder } -// MockInterfaceMockRecorder is the mock recorder for MockInterface. +// MockInterfaceMockRecorder is the mock recorder for MockInterface type MockInterfaceMockRecorder struct { mock *MockInterface } -// NewMockInterface creates a new mock instance. +// NewMockInterface creates a new mock instance func NewMockInterface(ctrl *gomock.Controller) *MockInterface { mock := &MockInterface{ctrl: ctrl} mock.recorder = &MockInterfaceMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use. +// EXPECT returns an object that allows the caller to indicate expected use func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { return m.recorder } -// Send mocks base method. +// Send mocks base method func (m *MockInterface) Send(ctx context.Context, request *http.Request) (*http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Send", ctx, request) @@ -65,13 +64,13 @@ func (m *MockInterface) Send(ctx context.Context, request *http.Request) (*http. return ret0, ret1 } -// Send indicates an expected call of Send. +// Send indicates an expected call of Send func (mr *MockInterfaceMockRecorder) Send(ctx, request interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockInterface)(nil).Send), ctx, request) } -// PreparePutRequest mocks base method. +// PreparePutRequest mocks base method func (m *MockInterface) PreparePutRequest(ctx context.Context, decorators ...autorest.PrepareDecorator) (*http.Request, error) { m.ctrl.T.Helper() varargs := []interface{}{ctx} @@ -84,14 +83,14 @@ func (m *MockInterface) PreparePutRequest(ctx context.Context, decorators ...aut return ret0, ret1 } -// PreparePutRequest indicates an expected call of PreparePutRequest. +// PreparePutRequest indicates an expected call of PreparePutRequest func (mr *MockInterfaceMockRecorder) PreparePutRequest(ctx interface{}, decorators ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{ctx}, decorators...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PreparePutRequest", reflect.TypeOf((*MockInterface)(nil).PreparePutRequest), varargs...) } -// PreparePostRequest mocks base method. +// PreparePostRequest mocks base method func (m *MockInterface) PreparePostRequest(ctx context.Context, decorators ...autorest.PrepareDecorator) (*http.Request, error) { m.ctrl.T.Helper() varargs := []interface{}{ctx} @@ -104,14 +103,14 @@ func (m *MockInterface) PreparePostRequest(ctx context.Context, decorators ...au return ret0, ret1 } -// PreparePostRequest indicates an expected call of PreparePostRequest. +// PreparePostRequest indicates an expected call of PreparePostRequest func (mr *MockInterfaceMockRecorder) PreparePostRequest(ctx interface{}, decorators ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{ctx}, decorators...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PreparePostRequest", reflect.TypeOf((*MockInterface)(nil).PreparePostRequest), varargs...) } -// PrepareGetRequest mocks base method. +// PrepareGetRequest mocks base method func (m *MockInterface) PrepareGetRequest(ctx context.Context, decorators ...autorest.PrepareDecorator) (*http.Request, error) { m.ctrl.T.Helper() varargs := []interface{}{ctx} @@ -124,14 +123,14 @@ func (m *MockInterface) PrepareGetRequest(ctx context.Context, decorators ...aut return ret0, ret1 } -// PrepareGetRequest indicates an expected call of PrepareGetRequest. +// PrepareGetRequest indicates an expected call of PrepareGetRequest func (mr *MockInterfaceMockRecorder) PrepareGetRequest(ctx interface{}, decorators ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{ctx}, decorators...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareGetRequest", reflect.TypeOf((*MockInterface)(nil).PrepareGetRequest), varargs...) } -// PrepareDeleteRequest mocks base method. +// PrepareDeleteRequest mocks base method func (m *MockInterface) PrepareDeleteRequest(ctx context.Context, decorators ...autorest.PrepareDecorator) (*http.Request, error) { m.ctrl.T.Helper() varargs := []interface{}{ctx} @@ -144,14 +143,14 @@ func (m *MockInterface) PrepareDeleteRequest(ctx context.Context, decorators ... return ret0, ret1 } -// PrepareDeleteRequest indicates an expected call of PrepareDeleteRequest. +// PrepareDeleteRequest indicates an expected call of PrepareDeleteRequest func (mr *MockInterfaceMockRecorder) PrepareDeleteRequest(ctx interface{}, decorators ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{ctx}, decorators...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareDeleteRequest", reflect.TypeOf((*MockInterface)(nil).PrepareDeleteRequest), varargs...) } -// PrepareHeadRequest mocks base method. +// PrepareHeadRequest mocks base method func (m *MockInterface) PrepareHeadRequest(ctx context.Context, decorators ...autorest.PrepareDecorator) (*http.Request, error) { m.ctrl.T.Helper() varargs := []interface{}{ctx} @@ -164,14 +163,14 @@ func (m *MockInterface) PrepareHeadRequest(ctx context.Context, decorators ...au return ret0, ret1 } -// PrepareHeadRequest indicates an expected call of PrepareHeadRequest. +// PrepareHeadRequest indicates an expected call of PrepareHeadRequest func (mr *MockInterfaceMockRecorder) PrepareHeadRequest(ctx interface{}, decorators ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{ctx}, decorators...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareHeadRequest", reflect.TypeOf((*MockInterface)(nil).PrepareHeadRequest), varargs...) } -// WaitForAsyncOperationCompletion mocks base method. +// WaitForAsyncOperationCompletion mocks base method func (m *MockInterface) WaitForAsyncOperationCompletion(ctx context.Context, future *azure.Future, asyncOperationName string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WaitForAsyncOperationCompletion", ctx, future, asyncOperationName) @@ -179,13 +178,13 @@ func (m *MockInterface) WaitForAsyncOperationCompletion(ctx context.Context, fut return ret0 } -// WaitForAsyncOperationCompletion indicates an expected call of WaitForAsyncOperationCompletion. +// WaitForAsyncOperationCompletion indicates an expected call of WaitForAsyncOperationCompletion func (mr *MockInterfaceMockRecorder) WaitForAsyncOperationCompletion(ctx, future, asyncOperationName interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForAsyncOperationCompletion", reflect.TypeOf((*MockInterface)(nil).WaitForAsyncOperationCompletion), ctx, future, asyncOperationName) } -// WaitForAsyncOperationResult mocks base method. +// WaitForAsyncOperationResult mocks base method func (m *MockInterface) WaitForAsyncOperationResult(ctx context.Context, future *azure.Future, asyncOperationName string) (*http.Response, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WaitForAsyncOperationResult", ctx, future, asyncOperationName) @@ -194,13 +193,13 @@ func (m *MockInterface) WaitForAsyncOperationResult(ctx context.Context, future return ret0, ret1 } -// WaitForAsyncOperationResult indicates an expected call of WaitForAsyncOperationResult. +// WaitForAsyncOperationResult indicates an expected call of WaitForAsyncOperationResult func (mr *MockInterfaceMockRecorder) WaitForAsyncOperationResult(ctx, future, asyncOperationName interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForAsyncOperationResult", reflect.TypeOf((*MockInterface)(nil).WaitForAsyncOperationResult), ctx, future, asyncOperationName) } -// SendAsync mocks base method. +// SendAsync mocks base method func (m *MockInterface) SendAsync(ctx context.Context, request *http.Request) (*azure.Future, *http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendAsync", ctx, request) @@ -210,13 +209,13 @@ func (m *MockInterface) SendAsync(ctx context.Context, request *http.Request) (* return ret0, ret1, ret2 } -// SendAsync indicates an expected call of SendAsync. +// SendAsync indicates an expected call of SendAsync func (mr *MockInterfaceMockRecorder) SendAsync(ctx, request interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendAsync", reflect.TypeOf((*MockInterface)(nil).SendAsync), ctx, request) } -// PutResource mocks base method. +// PutResource mocks base method func (m *MockInterface) PutResource(ctx context.Context, resourceID string, parameters interface{}) (*http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutResource", ctx, resourceID, parameters) @@ -225,13 +224,13 @@ func (m *MockInterface) PutResource(ctx context.Context, resourceID string, para return ret0, ret1 } -// PutResource indicates an expected call of PutResource. +// PutResource indicates an expected call of PutResource func (mr *MockInterfaceMockRecorder) PutResource(ctx, resourceID, parameters interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResource", reflect.TypeOf((*MockInterface)(nil).PutResource), ctx, resourceID, parameters) } -// PutResources mocks base method. +// PutResources mocks base method func (m *MockInterface) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*armclient.PutResourcesResponse { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutResources", ctx, resources) @@ -239,13 +238,27 @@ func (m *MockInterface) PutResources(ctx context.Context, resources map[string]i return ret0 } -// PutResources indicates an expected call of PutResources. +// PutResources indicates an expected call of PutResources func (mr *MockInterfaceMockRecorder) PutResources(ctx, resources interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResources", reflect.TypeOf((*MockInterface)(nil).PutResources), ctx, resources) } -// PutResourceWithDecorators mocks base method. +// PutResourcesInBatches mocks base method +func (m *MockInterface) PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*armclient.PutResourcesResponse { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutResourcesInBatches", ctx, resources, batchSize) + ret0, _ := ret[0].(map[string]*armclient.PutResourcesResponse) + return ret0 +} + +// PutResourcesInBatches indicates an expected call of PutResourcesInBatches +func (mr *MockInterfaceMockRecorder) PutResourcesInBatches(ctx, resources, batchSize interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResourcesInBatches", reflect.TypeOf((*MockInterface)(nil).PutResourcesInBatches), ctx, resources, batchSize) +} + +// PutResourceWithDecorators mocks base method func (m *MockInterface) PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutResourceWithDecorators", ctx, resourceID, parameters, decorators) @@ -254,13 +267,13 @@ func (m *MockInterface) PutResourceWithDecorators(ctx context.Context, resourceI return ret0, ret1 } -// PutResourceWithDecorators indicates an expected call of PutResourceWithDecorators. +// PutResourceWithDecorators indicates an expected call of PutResourceWithDecorators func (mr *MockInterfaceMockRecorder) PutResourceWithDecorators(ctx, resourceID, parameters, decorators interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResourceWithDecorators", reflect.TypeOf((*MockInterface)(nil).PutResourceWithDecorators), ctx, resourceID, parameters, decorators) } -// PatchResource mocks base method. +// PatchResource mocks base method func (m *MockInterface) PatchResource(ctx context.Context, resourceID string, parameters interface{}) (*http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PatchResource", ctx, resourceID, parameters) @@ -269,13 +282,13 @@ func (m *MockInterface) PatchResource(ctx context.Context, resourceID string, pa return ret0, ret1 } -// PatchResource indicates an expected call of PatchResource. +// PatchResource indicates an expected call of PatchResource func (mr *MockInterfaceMockRecorder) PatchResource(ctx, resourceID, parameters interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchResource", reflect.TypeOf((*MockInterface)(nil).PatchResource), ctx, resourceID, parameters) } -// PatchResourceAsync mocks base method. +// PatchResourceAsync mocks base method func (m *MockInterface) PatchResourceAsync(ctx context.Context, resourceID string, parameters interface{}) (*azure.Future, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PatchResourceAsync", ctx, resourceID, parameters) @@ -284,13 +297,13 @@ func (m *MockInterface) PatchResourceAsync(ctx context.Context, resourceID strin return ret0, ret1 } -// PatchResourceAsync indicates an expected call of PatchResourceAsync. +// PatchResourceAsync indicates an expected call of PatchResourceAsync func (mr *MockInterfaceMockRecorder) PatchResourceAsync(ctx, resourceID, parameters interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchResourceAsync", reflect.TypeOf((*MockInterface)(nil).PatchResourceAsync), ctx, resourceID, parameters) } -// PutResourceAsync mocks base method. +// PutResourceAsync mocks base method func (m *MockInterface) PutResourceAsync(ctx context.Context, resourceID string, parameters interface{}) (*azure.Future, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutResourceAsync", ctx, resourceID, parameters) @@ -299,13 +312,13 @@ func (m *MockInterface) PutResourceAsync(ctx context.Context, resourceID string, return ret0, ret1 } -// PutResourceAsync indicates an expected call of PutResourceAsync. +// PutResourceAsync indicates an expected call of PutResourceAsync func (mr *MockInterfaceMockRecorder) PutResourceAsync(ctx, resourceID, parameters interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResourceAsync", reflect.TypeOf((*MockInterface)(nil).PutResourceAsync), ctx, resourceID, parameters) } -// HeadResource mocks base method. +// HeadResource mocks base method func (m *MockInterface) HeadResource(ctx context.Context, resourceID string) (*http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "HeadResource", ctx, resourceID) @@ -314,13 +327,13 @@ func (m *MockInterface) HeadResource(ctx context.Context, resourceID string) (*h return ret0, ret1 } -// HeadResource indicates an expected call of HeadResource. +// HeadResource indicates an expected call of HeadResource func (mr *MockInterfaceMockRecorder) HeadResource(ctx, resourceID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadResource", reflect.TypeOf((*MockInterface)(nil).HeadResource), ctx, resourceID) } -// GetResource mocks base method. +// GetResource mocks base method func (m *MockInterface) GetResource(ctx context.Context, resourceID, expand string) (*http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetResource", ctx, resourceID, expand) @@ -329,13 +342,13 @@ func (m *MockInterface) GetResource(ctx context.Context, resourceID, expand stri return ret0, ret1 } -// GetResource indicates an expected call of GetResource. +// GetResource indicates an expected call of GetResource func (mr *MockInterfaceMockRecorder) GetResource(ctx, resourceID, expand interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResource", reflect.TypeOf((*MockInterface)(nil).GetResource), ctx, resourceID, expand) } -// GetResourceWithDecorators mocks base method. +// GetResourceWithDecorators mocks base method func (m *MockInterface) GetResourceWithDecorators(ctx context.Context, resourceID string, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetResourceWithDecorators", ctx, resourceID, decorators) @@ -344,13 +357,13 @@ func (m *MockInterface) GetResourceWithDecorators(ctx context.Context, resourceI return ret0, ret1 } -// GetResourceWithDecorators indicates an expected call of GetResourceWithDecorators. +// GetResourceWithDecorators indicates an expected call of GetResourceWithDecorators func (mr *MockInterfaceMockRecorder) GetResourceWithDecorators(ctx, resourceID, decorators interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResourceWithDecorators", reflect.TypeOf((*MockInterface)(nil).GetResourceWithDecorators), ctx, resourceID, decorators) } -// PostResource mocks base method. +// PostResource mocks base method func (m *MockInterface) PostResource(ctx context.Context, resourceID, action string, parameters interface{}, queryParameters map[string]interface{}) (*http.Response, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PostResource", ctx, resourceID, action, parameters, queryParameters) @@ -359,13 +372,13 @@ func (m *MockInterface) PostResource(ctx context.Context, resourceID, action str return ret0, ret1 } -// PostResource indicates an expected call of PostResource. +// PostResource indicates an expected call of PostResource func (mr *MockInterfaceMockRecorder) PostResource(ctx, resourceID, action, parameters, queryParameters interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostResource", reflect.TypeOf((*MockInterface)(nil).PostResource), ctx, resourceID, action, parameters, queryParameters) } -// DeleteResource mocks base method. +// DeleteResource mocks base method func (m *MockInterface) DeleteResource(ctx context.Context, resourceID, ifMatch string) *retry.Error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteResource", ctx, resourceID, ifMatch) @@ -373,13 +386,13 @@ func (m *MockInterface) DeleteResource(ctx context.Context, resourceID, ifMatch return ret0 } -// DeleteResource indicates an expected call of DeleteResource. +// DeleteResource indicates an expected call of DeleteResource func (mr *MockInterfaceMockRecorder) DeleteResource(ctx, resourceID, ifMatch interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteResource", reflect.TypeOf((*MockInterface)(nil).DeleteResource), ctx, resourceID, ifMatch) } -// DeleteResourceAsync mocks base method. +// DeleteResourceAsync mocks base method func (m *MockInterface) DeleteResourceAsync(ctx context.Context, resourceID, ifMatch string) (*azure.Future, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteResourceAsync", ctx, resourceID, ifMatch) @@ -388,19 +401,19 @@ func (m *MockInterface) DeleteResourceAsync(ctx context.Context, resourceID, ifM return ret0, ret1 } -// DeleteResourceAsync indicates an expected call of DeleteResourceAsync. +// DeleteResourceAsync indicates an expected call of DeleteResourceAsync func (mr *MockInterfaceMockRecorder) DeleteResourceAsync(ctx, resourceID, ifMatch interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteResourceAsync", reflect.TypeOf((*MockInterface)(nil).DeleteResourceAsync), ctx, resourceID, ifMatch) } -// CloseResponse mocks base method. +// CloseResponse mocks base method func (m *MockInterface) CloseResponse(ctx context.Context, response *http.Response) { m.ctrl.T.Helper() m.ctrl.Call(m, "CloseResponse", ctx, response) } -// CloseResponse indicates an expected call of CloseResponse. +// CloseResponse indicates an expected call of CloseResponse func (mr *MockInterfaceMockRecorder) CloseResponse(ctx, response interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseResponse", reflect.TypeOf((*MockInterface)(nil).CloseResponse), ctx, response) diff --git a/pkg/azureclients/vmssvmclient/azure_vmssvmclient.go b/pkg/azureclients/vmssvmclient/azure_vmssvmclient.go index ea20146a2b..8379f24492 100644 --- a/pkg/azureclients/vmssvmclient/azure_vmssvmclient.go +++ b/pkg/azureclients/vmssvmclient/azure_vmssvmclient.go @@ -439,7 +439,9 @@ func (page VirtualMachineScaleSetVMListResultPage) Values() []compute.VirtualMac } // UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM. -func (c *Client) UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error { +// If the batch size > 0, it will send sync requests concurrently in batches, or it will send sync requests in sequence. +// No matter what the batch size is, it will process the async requests concurrently in one single batch. +func (c *Client) UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string, batchSize int) *retry.Error { mc := metrics.NewMetricContext("vmssvm", "update_vms", resourceGroupName, c.subscriptionID, source) // Report errors if the client is rate limited. @@ -455,7 +457,7 @@ func (c *Client) UpdateVMs(ctx context.Context, resourceGroupName string, VMScal return rerr } - rerr := c.updateVMSSVMs(ctx, resourceGroupName, VMScaleSetName, instances) + rerr := c.updateVMSSVMs(ctx, resourceGroupName, VMScaleSetName, instances, batchSize) mc.Observe(rerr) if rerr != nil { if rerr.IsThrottled() { @@ -470,7 +472,7 @@ func (c *Client) UpdateVMs(ctx context.Context, resourceGroupName string, VMScal } // updateVMSSVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM. -func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM) *retry.Error { +func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, batchSize int) *retry.Error { resources := make(map[string]interface{}) for instanceID, parameter := range instances { resourceID := armclient.GetChildResourceID( @@ -484,7 +486,7 @@ func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VM resources[resourceID] = parameter } - responses := c.armClient.PutResources(ctx, resources) + responses := c.armClient.PutResourcesInBatches(ctx, resources, batchSize) errors := make([]*retry.Error, 0) for resourceID, resp := range responses { if resp == nil { diff --git a/pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go b/pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go index 3a0406204a..b30bfce7d4 100644 --- a/pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go +++ b/pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go @@ -650,11 +650,11 @@ func TestUpdateVMs(t *testing.T) { } armClient := mockarmclient.NewMockInterface(ctrl) - armClient.EXPECT().PutResources(gomock.Any(), testvmssVMs).Return(responses).Times(1) + armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs, 0).Return(responses).Times(1) armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(2) vmssvmClient := getTestVMSSVMClient(armClient) - rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test") + rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0) assert.Nil(t, rerr) } @@ -679,11 +679,11 @@ func TestUpdateVMsWithUpdateVMsResponderError(t *testing.T) { }, } armClient := mockarmclient.NewMockInterface(ctrl) - armClient.EXPECT().PutResources(gomock.Any(), testvmssVMs).Return(responses).Times(1) + armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs, 0).Return(responses).Times(1) armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(1) vmssvmClient := getTestVMSSVMClient(armClient) - rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test") + rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0) assert.NotNil(t, rerr) } @@ -699,7 +699,7 @@ func TestUpdateVMsNeverRateLimiter(t *testing.T) { armClient := mockarmclient.NewMockInterface(ctrl) vmssvmClient := getTestVMSSVMClientWithNeverRateLimiter(armClient) - rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test") + rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0) assert.NotNil(t, rerr) assert.Equal(t, vmssvmUpdateVMsErr, rerr) } @@ -717,7 +717,7 @@ func TestUpdateVMsRetryAfterReader(t *testing.T) { armClient := mockarmclient.NewMockInterface(ctrl) vmClient := getTestVMSSVMClientWithRetryAfterReader(armClient) - rerr := vmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test") + rerr := vmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0) assert.NotNil(t, rerr) assert.Equal(t, vmssvmUpdateVMsErr, rerr) } @@ -750,11 +750,11 @@ func TestUpdateVMsThrottle(t *testing.T) { } armClient := mockarmclient.NewMockInterface(ctrl) - armClient.EXPECT().PutResources(gomock.Any(), testvmssVMs).Return(responses).Times(1) + armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs, 0).Return(responses).Times(1) armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(1) vmssvmClient := getTestVMSSVMClient(armClient) - rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test") + rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0) assert.NotNil(t, rerr) assert.EqualError(t, throttleErr.Error(), rerr.RawError.Error()) } diff --git a/pkg/azureclients/vmssvmclient/interface.go b/pkg/azureclients/vmssvmclient/interface.go index e10775cb2d..84adbee110 100644 --- a/pkg/azureclients/vmssvmclient/interface.go +++ b/pkg/azureclients/vmssvmclient/interface.go @@ -53,5 +53,5 @@ type Interface interface { WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) *retry.Error // UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM. - UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error + UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string, batchSize int) *retry.Error } diff --git a/pkg/azureclients/vmssvmclient/mockvmssvmclient/interface.go b/pkg/azureclients/vmssvmclient/mockvmssvmclient/interface.go index cecf719649..f98312e9e2 100644 --- a/pkg/azureclients/vmssvmclient/mockvmssvmclient/interface.go +++ b/pkg/azureclients/vmssvmclient/mockvmssvmclient/interface.go @@ -23,38 +23,37 @@ package mockvmssvmclient import ( context "context" - reflect "reflect" - compute "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2020-12-01/compute" azure "github.com/Azure/go-autorest/autorest/azure" gomock "github.com/golang/mock/gomock" + reflect "reflect" retry "sigs.k8s.io/cloud-provider-azure/pkg/retry" ) -// MockInterface is a mock of Interface interface. +// MockInterface is a mock of Interface interface type MockInterface struct { ctrl *gomock.Controller recorder *MockInterfaceMockRecorder } -// MockInterfaceMockRecorder is the mock recorder for MockInterface. +// MockInterfaceMockRecorder is the mock recorder for MockInterface type MockInterfaceMockRecorder struct { mock *MockInterface } -// NewMockInterface creates a new mock instance. +// NewMockInterface creates a new mock instance func NewMockInterface(ctrl *gomock.Controller) *MockInterface { mock := &MockInterface{ctrl: ctrl} mock.recorder = &MockInterfaceMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use. +// EXPECT returns an object that allows the caller to indicate expected use func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { return m.recorder } -// Get mocks base method. +// Get mocks base method func (m *MockInterface) Get(ctx context.Context, resourceGroupName, VMScaleSetName, instanceID string, expand compute.InstanceViewTypes) (compute.VirtualMachineScaleSetVM, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", ctx, resourceGroupName, VMScaleSetName, instanceID, expand) @@ -63,13 +62,13 @@ func (m *MockInterface) Get(ctx context.Context, resourceGroupName, VMScaleSetNa return ret0, ret1 } -// Get indicates an expected call of Get. +// Get indicates an expected call of Get func (mr *MockInterfaceMockRecorder) Get(ctx, resourceGroupName, VMScaleSetName, instanceID, expand interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockInterface)(nil).Get), ctx, resourceGroupName, VMScaleSetName, instanceID, expand) } -// List mocks base method. +// List mocks base method func (m *MockInterface) List(ctx context.Context, resourceGroupName, virtualMachineScaleSetName, expand string) ([]compute.VirtualMachineScaleSetVM, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "List", ctx, resourceGroupName, virtualMachineScaleSetName, expand) @@ -78,13 +77,13 @@ func (m *MockInterface) List(ctx context.Context, resourceGroupName, virtualMach return ret0, ret1 } -// List indicates an expected call of List. +// List indicates an expected call of List func (mr *MockInterfaceMockRecorder) List(ctx, resourceGroupName, virtualMachineScaleSetName, expand interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockInterface)(nil).List), ctx, resourceGroupName, virtualMachineScaleSetName, expand) } -// Update mocks base method. +// Update mocks base method func (m *MockInterface) Update(ctx context.Context, resourceGroupName, VMScaleSetName, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Update", ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source) @@ -92,13 +91,13 @@ func (m *MockInterface) Update(ctx context.Context, resourceGroupName, VMScaleSe return ret0 } -// Update indicates an expected call of Update. +// Update indicates an expected call of Update func (mr *MockInterfaceMockRecorder) Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockInterface)(nil).Update), ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source) } -// UpdateAsync mocks base method. +// UpdateAsync mocks base method func (m *MockInterface) UpdateAsync(ctx context.Context, resourceGroupName, VMScaleSetName, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) (*azure.Future, *retry.Error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateAsync", ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source) @@ -107,13 +106,13 @@ func (m *MockInterface) UpdateAsync(ctx context.Context, resourceGroupName, VMSc return ret0, ret1 } -// UpdateAsync indicates an expected call of UpdateAsync. +// UpdateAsync indicates an expected call of UpdateAsync func (mr *MockInterfaceMockRecorder) UpdateAsync(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAsync", reflect.TypeOf((*MockInterface)(nil).UpdateAsync), ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source) } -// WaitForUpdateResult mocks base method. +// WaitForUpdateResult mocks base method func (m *MockInterface) WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) *retry.Error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WaitForUpdateResult", ctx, future, resourceGroupName, source) @@ -121,22 +120,22 @@ func (m *MockInterface) WaitForUpdateResult(ctx context.Context, future *azure.F return ret0 } -// WaitForUpdateResult indicates an expected call of WaitForUpdateResult. +// WaitForUpdateResult indicates an expected call of WaitForUpdateResult func (mr *MockInterfaceMockRecorder) WaitForUpdateResult(ctx, future, resourceGroupName, source interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForUpdateResult", reflect.TypeOf((*MockInterface)(nil).WaitForUpdateResult), ctx, future, resourceGroupName, source) } -// UpdateVMs mocks base method. -func (m *MockInterface) UpdateVMs(ctx context.Context, resourceGroupName, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error { +// UpdateVMs mocks base method +func (m *MockInterface) UpdateVMs(ctx context.Context, resourceGroupName, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string, batchSize int) *retry.Error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateVMs", ctx, resourceGroupName, VMScaleSetName, instances, source) + ret := m.ctrl.Call(m, "UpdateVMs", ctx, resourceGroupName, VMScaleSetName, instances, source, batchSize) ret0, _ := ret[0].(*retry.Error) return ret0 } -// UpdateVMs indicates an expected call of UpdateVMs. -func (mr *MockInterfaceMockRecorder) UpdateVMs(ctx, resourceGroupName, VMScaleSetName, instances, source interface{}) *gomock.Call { +// UpdateVMs indicates an expected call of UpdateVMs +func (mr *MockInterfaceMockRecorder) UpdateVMs(ctx, resourceGroupName, VMScaleSetName, instances, source, batchSize interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateVMs", reflect.TypeOf((*MockInterface)(nil).UpdateVMs), ctx, resourceGroupName, VMScaleSetName, instances, source) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateVMs", reflect.TypeOf((*MockInterface)(nil).UpdateVMs), ctx, resourceGroupName, VMScaleSetName, instances, source, batchSize) } diff --git a/pkg/provider/azure.go b/pkg/provider/azure.go index f177722379..cc841da512 100644 --- a/pkg/provider/azure.go +++ b/pkg/provider/azure.go @@ -240,6 +240,9 @@ type Config struct { // `nodeIP`: vm private IPs will be attached to the inbound backend pool of the load balancer; // `podIP`: pod IPs will be attached to the inbound backend pool of the load balancer (not supported yet). LoadBalancerBackendPoolConfigurationType string `json:"loadBalancerBackendPoolConfigurationType,omitempty" yaml:"loadBalancerBackendPoolConfigurationType,omitempty"` + // PutVMSSVMBatchSize defines how many requests the client send concurrently when putting the VMSS VMs. + // If it is smaller than or equal to zero, the request will be sent one by one in sequence (default). + PutVMSSVMBatchSize int `json:"putVMSSVMBatchSize" yaml:"putVMSSVMBatchSize"` } type InitSecretConfig struct { @@ -640,6 +643,10 @@ func (az *Cloud) isLBBackendPoolTypeNodeIP() bool { return strings.EqualFold(az.LoadBalancerBackendPoolConfigurationType, consts.LoadBalancerBackendPoolConfigurationTypeNodeIP) } +func (az *Cloud) getPutVMSSVMBatchSize() int { + return az.PutVMSSVMBatchSize +} + func (az *Cloud) initCaches() (err error) { az.vmCache, err = az.newVMCache() if err != nil { diff --git a/pkg/provider/azure_vmss.go b/pkg/provider/azure_vmss.go index b2a6ad3708..0a14b870ca 100644 --- a/pkg/provider/azure_vmss.go +++ b/pkg/provider/azure_vmss.go @@ -1330,7 +1330,7 @@ func (ss *ScaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac ctx, cancel := getContextWithCancel() defer cancel() klog.V(2).Infof("EnsureHostInPool begins to UpdateVMs for VMSS(%s, %s) with new backendPoolID %s", meta.resourceGroup, meta.vmssName, backendPoolID) - rerr := ss.VirtualMachineScaleSetVMsClient.UpdateVMs(ctx, meta.resourceGroup, meta.vmssName, update, "network_update") + rerr := ss.VirtualMachineScaleSetVMsClient.UpdateVMs(ctx, meta.resourceGroup, meta.vmssName, update, "network_update", ss.getPutVMSSVMBatchSize()) if rerr != nil { klog.Errorf("EnsureHostInPool UpdateVMs for VMSS(%s, %s) failed with error %v", meta.resourceGroup, meta.vmssName, rerr.Error()) return rerr.Error() @@ -1592,7 +1592,7 @@ func (ss *ScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, ctx, cancel := getContextWithCancel() defer cancel() klog.V(2).Infof("EnsureBackendPoolDeleted begins to UpdateVMs for VMSS(%s, %s) with backendPoolID %s", meta.resourceGroup, meta.vmssName, backendPoolID) - rerr := ss.VirtualMachineScaleSetVMsClient.UpdateVMs(ctx, meta.resourceGroup, meta.vmssName, update, "network_update") + rerr := ss.VirtualMachineScaleSetVMsClient.UpdateVMs(ctx, meta.resourceGroup, meta.vmssName, update, "network_update", ss.getPutVMSSVMBatchSize()) if rerr != nil { klog.Errorf("EnsureBackendPoolDeleted UpdateVMs for VMSS(%s, %s) failed with error %v", meta.resourceGroup, meta.vmssName, rerr.Error()) return rerr.Error() diff --git a/pkg/provider/azure_vmss_test.go b/pkg/provider/azure_vmss_test.go index 7c03497674..62af858beb 100644 --- a/pkg/provider/azure_vmss_test.go +++ b/pkg/provider/azure_vmss_test.go @@ -2190,7 +2190,7 @@ func TestEnsureHostsInPool(t *testing.T) { expectedVMSSVMs, _, _ := buildTestVirtualMachineEnv(ss.cloud, testVMSSName, "", 0, []string{"vmss-vm-000000", "vmss-vm-000001", "vmss-vm-000002"}, "", false) mockVMSSVMClient := ss.cloud.VirtualMachineScaleSetVMsClient.(*mockvmssvmclient.MockInterface) mockVMSSVMClient.EXPECT().List(gomock.Any(), ss.ResourceGroup, testVMSSName, gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() - mockVMSSVMClient.EXPECT().UpdateVMs(gomock.Any(), ss.ResourceGroup, testVMSSName, gomock.Any(), gomock.Any()).Return(nil).Times(test.expectedVMSSVMPutTimes) + mockVMSSVMClient.EXPECT().UpdateVMs(gomock.Any(), ss.ResourceGroup, testVMSSName, gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(test.expectedVMSSVMPutTimes) mockVMClient := ss.cloud.VirtualMachinesClient.(*mockvmclient.MockInterface) mockVMClient.EXPECT().List(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() @@ -2481,7 +2481,7 @@ func TestEnsureBackendPoolDeleted(t *testing.T) { expectedVMSSVMs, _, _ := buildTestVirtualMachineEnv(ss.cloud, testVMSSName, "", 0, []string{"vmss-vm-000000", "vmss-vm-000001", "vmss-vm-000002"}, "", false) mockVMSSVMClient := ss.cloud.VirtualMachineScaleSetVMsClient.(*mockvmssvmclient.MockInterface) mockVMSSVMClient.EXPECT().List(gomock.Any(), ss.ResourceGroup, testVMSSName, gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() - mockVMSSVMClient.EXPECT().UpdateVMs(gomock.Any(), ss.ResourceGroup, testVMSSName, gomock.Any(), gomock.Any()).Return(test.vmClientErr).Times(test.expectedVMSSVMPutTimes) + mockVMSSVMClient.EXPECT().UpdateVMs(gomock.Any(), ss.ResourceGroup, testVMSSName, gomock.Any(), gomock.Any(), gomock.Any()).Return(test.vmClientErr).Times(test.expectedVMSSVMPutTimes) err = ss.EnsureBackendPoolDeleted(&v1.Service{}, test.backendpoolID, testVMSSName, test.backendAddressPools, true) assert.Equal(t, test.expectedErr, err != nil, test.description+", but an error occurs") @@ -2554,7 +2554,7 @@ func TestEnsureBackendPoolDeletedConcurrently(t *testing.T) { mockVMSSVMClient.EXPECT().List(gomock.Any(), "rg1", "vmss-0", gomock.Any()).Return(nil, nil).AnyTimes() mockVMSSVMClient.EXPECT().List(gomock.Any(), ss.ResourceGroup, "vmss-0", gomock.Any()).Return(expectedVMSSVMsOfVMSS0, nil).AnyTimes() mockVMSSVMClient.EXPECT().List(gomock.Any(), ss.ResourceGroup, "vmss-1", gomock.Any()).Return(expectedVMSSVMsOfVMSS1, nil).AnyTimes() - mockVMSSVMClient.EXPECT().UpdateVMs(gomock.Any(), ss.ResourceGroup, gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(2) + mockVMSSVMClient.EXPECT().UpdateVMs(gomock.Any(), ss.ResourceGroup, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(2) backendpoolAddressIDs := []string{testLBBackendpoolID0, testLBBackendpoolID1, testLBBackendpoolID2} testVMSSNames := []string{"vmss-0", "vmss-1", "vmss-2"}