Skip to content

Commit

Permalink
Update xDS only for the last chunk
Browse files Browse the repository at this point in the history
Signed-off-by: Renuka Fernando <renukapiyumal@gmail.com>
  • Loading branch information
renuka-fernando committed Feb 29, 2024
1 parent ee389d1 commit 4ba980a
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 17 deletions.
2 changes: 1 addition & 1 deletion adapter/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ var defaultConfig = &Config{
PauseTimeAfterFailure: 5,
},
InitialFetch: initialFetch{
ChunkSize: 500,
ChunkSize: 10000,
},
},
GlobalAdapter: globalAdapter{
Expand Down
9 changes: 5 additions & 4 deletions adapter/internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,15 @@ OUTER:
func fetchAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {
if apiUUIDList == nil {
logger.LoggerMgw.Info("Fetching APIs at startup...")
fetchChunkedAPIsOnStartUp(conf, nil)
fetchChunkedAPIsOnStartUp(conf, nil, common.XdsOptions{})
} else {
logger.LoggerMgw.Infof("Fetching APIs at startup with the received API UUID list (size: %d)...", len(apiUUIDList))

chunkedAPIUuidsList := utils.ChunkSlice(apiUUIDList, conf.ControlPlane.InitialFetch.ChunkSize)
for i, chunkedAPIUuids := range chunkedAPIUuidsList {
logger.LoggerMgw.Infof("Fetching chunked APIs... [%d/%d]", i+1, len(chunkedAPIUuidsList))
fetchChunkedAPIsOnStartUp(conf, chunkedAPIUuids)
isNotFinalChunk := i != len(chunkedAPIUuidsList)-1
fetchChunkedAPIsOnStartUp(conf, chunkedAPIUuids, common.XdsOptions{SkipUpdatingXdsCache: isNotFinalChunk})
}
}

Expand All @@ -352,7 +353,7 @@ func fetchAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {

// fetch APIs from control plane during the server start up and push them
// to the router and enforcer components.
func fetchChunkedAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {
func fetchChunkedAPIsOnStartUp(conf *config.Config, apiUUIDList []string, xdsOptions common.XdsOptions) {
// Populate data from config.
envs := conf.ControlPlane.EnvironmentLabels

Expand Down Expand Up @@ -383,7 +384,7 @@ func fetchChunkedAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {
if data.Resp != nil {
// For successfull fetches, data.Resp would return a byte slice with API project(s)
logger.LoggerMgw.Debug("Pushing data to router and enforcer")
err := synchronizer.PushAPIProjects(data.Resp, envs)
err := synchronizer.PushAPIProjects(data.Resp, envs, xdsOptions)
if err != nil {
logger.LoggerMgw.Errorf("Error occurred while pushing API data: %v ", err)
}
Expand Down
6 changes: 4 additions & 2 deletions adapter/internal/api/apis_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/wso2/product-microgateway/adapter/config"
apiModel "github.com/wso2/product-microgateway/adapter/internal/api/models"
"github.com/wso2/product-microgateway/adapter/internal/common"
xds "github.com/wso2/product-microgateway/adapter/internal/discovery/xds"
"github.com/wso2/product-microgateway/adapter/internal/loggers"
"github.com/wso2/product-microgateway/adapter/internal/notifier"
Expand Down Expand Up @@ -221,7 +222,7 @@ func validateAndUpdateXds(apiProject mgw.ProjectAPI, override *bool) (err error)

// TODO: (renuka) optimize to update cache only once when all internal memory maps are updated
for vhost, environments := range vhostToEnvsMap {
_, err = xds.UpdateAPI(vhost, apiProject, environments)
_, err = xds.UpdateAPI(vhost, apiProject, environments, common.XdsOptions{})
if err != nil {
return
}
Expand All @@ -235,6 +236,7 @@ func ApplyAPIProjectFromAPIM(
payload []byte,
vhostToEnvsMap map[string][]*synchronizer.GatewayLabel,
apiEnvs map[string]map[string]synchronizer.APIEnvProps,
xdsOptions common.XdsOptions,
) (deployedRevisionList []*notifier.DeployedAPIRevision, err error) {
apiProject, err := extractAPIProject(payload)
if err != nil {
Expand Down Expand Up @@ -280,7 +282,7 @@ func ApplyAPIProjectFromAPIM(
loggers.LoggerAPI.Debugf("Update all environments (%v) of API %v %v:%v with UUID \"%v\".",
environments, vhost, apiYaml.Name, apiYaml.Version, apiYaml.ID)
// first update the API for vhost
deployedRevision, err := xds.UpdateAPI(vhost, apiProject, environments)
deployedRevision, err := xds.UpdateAPI(vhost, apiProject, environments, xdsOptions)
if err != nil {
return deployedRevisionList, fmt.Errorf("%v:%v with UUID \"%v\"", apiYaml.Name, apiYaml.Version, apiYaml.ID)
}
Expand Down
23 changes: 23 additions & 0 deletions adapter/internal/common/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package common

// XdsOptions represents the options for xDS.
type XdsOptions struct {
SkipUpdatingXdsCache bool
}
21 changes: 15 additions & 6 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/wso2/product-microgateway/adapter/config"
apiModel "github.com/wso2/product-microgateway/adapter/internal/api/models"
"github.com/wso2/product-microgateway/adapter/internal/common"
logger "github.com/wso2/product-microgateway/adapter/internal/loggers"
"github.com/wso2/product-microgateway/adapter/internal/notifier"
oasParser "github.com/wso2/product-microgateway/adapter/internal/oasparser"
Expand Down Expand Up @@ -271,7 +272,9 @@ func DeployReadinessAPI(envs []string) {
}

// UpdateAPI updates the Xds Cache when OpenAPI Json content is provided
func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, deployedEnvironments []*synchronizer.GatewayLabel) (*notifier.DeployedAPIRevision, error) {
func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, deployedEnvironments []*synchronizer.GatewayLabel,
xdsOptions common.XdsOptions) (*notifier.DeployedAPIRevision, error) {

var mgwSwagger mgw.MgwSwagger
var deployedRevision *notifier.DeployedAPIRevision
var err error
Expand Down Expand Up @@ -520,12 +523,18 @@ func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, deployedEnvironments []*
}

// TODO: (VirajSalaka) Fault tolerance mechanism implementation
revisionStatus := updateXdsCacheOnAPIAdd(oldLabels, routerLabels)
if revisionStatus {
// send updated revision to control plane
deployedRevision = notifier.UpdateDeployedRevisions(apiYaml.ID, apiYaml.RevisionID, environments,
vHost)
// Skipping the xDS cache update is fine as Choreo uses one gateway label for single Choreo Connect deployment.
if !xdsOptions.SkipUpdatingXdsCache {
logger.LoggerXds.Debugf("Updating the XDS cache for the API %v:%v", apiYaml.Name, apiYaml.Version)
updateXdsCacheOnAPIAdd(oldLabels, routerLabels)
} else {
logger.LoggerXds.Debugf("Skipping the XDS cache update for the API %v:%v", apiYaml.Name, apiYaml.Version)
}

// Send updated revision without checking the error state of xDS cache consistent state by assuming it should be consistent for the resources
// created by the Adapter.
deployedRevision = notifier.UpdateDeployedRevisions(apiYaml.ID, apiYaml.RevisionID, environments,
vHost)
if svcdiscovery.IsServiceDiscoveryEnabled {
startConsulServiceDiscovery(organizationID) //consul service discovery starting point
}
Expand Down
6 changes: 3 additions & 3 deletions adapter/internal/synchronizer/apis_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func init() {
// byte slice. This method ensures to update the enforcer and router using entries inside the
// downloaded apis.zip one by one.
// If the updating envoy or enforcer fails, this method returns an error, if not error would be nil.
func PushAPIProjects(payload []byte, environments []string) error {
func PushAPIProjects(payload []byte, environments []string, xdsOptions common.XdsOptions) error {
var deploymentList []*notifier.DeployedAPIRevision
// Reading the root zip
zipReader, err := zip.NewReader(bytes.NewReader(payload), int64(len(payload)))
Expand Down Expand Up @@ -113,7 +113,7 @@ func PushAPIProjects(payload []byte, environments []string) error {
// Pass the byte slice for the XDS APIs to push it to the enforcer and router
// TODO: (renuka) optimize applying API project, update maps one by one and apply xds once
var deployedRevisionList []*notifier.DeployedAPIRevision
deployedRevisionList, err = apiServer.ApplyAPIProjectFromAPIM(apiFileData, vhostToEnvsMap, envProps)
deployedRevisionList, err = apiServer.ApplyAPIProjectFromAPIM(apiFileData, vhostToEnvsMap, envProps, xdsOptions)
if err != nil {
logger.LoggerSync.Errorf("Error occurred while applying project %v", err)
} else if deployedRevisionList != nil {
Expand Down Expand Up @@ -228,7 +228,7 @@ func FetchAPIsFromControlPlane(updatedAPIID string, updatedEnvs []string, envToD
// For successfull fetches, data.Resp would return a byte slice with API project(s)
logger.LoggerSync.Infof("Pushing data to router and enforcer for the API %q", updatedAPIID)
receivedArtifact = true
err := PushAPIProjects(data.Resp, finalEnvs)
err := PushAPIProjects(data.Resp, finalEnvs, common.XdsOptions{})
if err != nil {
logger.LoggerSync.Errorf("Error occurred while pushing API data for the API %q: %v ", updatedAPIID, err)
}
Expand Down
2 changes: 1 addition & 1 deletion resources/conf/config.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ enabled = true
# Initial fetch configurations
[controlPlane.initialFetch]
# Number of APIs to be fetched in a single request to the Control Plane
chunkSize = 500
chunkSize = 10000

# Global Adapter related configurations
[globalAdapter]
Expand Down

0 comments on commit 4ba980a

Please sign in to comment.