Skip to content

Commit

Permalink
Merge branch 'choreo' of github.com:wso2/product-microgateway into ch…
Browse files Browse the repository at this point in the history
…oreo
  • Loading branch information
senthalan committed Nov 20, 2023
2 parents 9b4a6ec + 55cc214 commit c910fe0
Show file tree
Hide file tree
Showing 28 changed files with 115 additions and 54 deletions.
6 changes: 4 additions & 2 deletions adapter/internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ func runManagementServer(conf *config.Config, server xdsv3.Server, rlsServer xds
discoveryv3.RegisterAggregatedDiscoveryServiceServer(rlsGrpcServer, rlsServer)
go func() {
logger.LoggerMgw.Info("Starting Rate Limiter xDS gRPC server.")
health.RateLimiterGrpcService.SetStatus(true)
if err = rlsGrpcServer.Serve(rlsLis); err != nil {
health.RateLimiterGrpcService.SetStatus(false)
logger.LoggerMgw.Error("Error serving Rate Limiter xDS gRPC server: ", err)
}
}()
Expand Down Expand Up @@ -370,9 +372,9 @@ func fetchAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {
logger.LoggerMgw.Errorf("Error occurred while fetching data from control plane: %v", data.Err)
health.SetControlPlaneRestAPIStatus(false)
if conf.ControlPlane.DynamicEnvironments.Enabled {
sync.RetryFetchingAPIs(c, data, sync.RetrieveRuntimeArtifactEndpoint, true, queryParamMap)
sync.RetryFetchingAPIs(c, data, sync.RetrieveRuntimeArtifactEndpoint, true, queryParamMap, apiUUIDList)
} else {
sync.RetryFetchingAPIs(c, data, sync.RuntimeArtifactEndpoint, true, queryParamMap)
sync.RetryFetchingAPIs(c, data, sync.RuntimeArtifactEndpoint, true, queryParamMap, apiUUIDList)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
// OnStreamRequest prints debug logs
func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryRequest) error {
nodeIdentifier := common.GetNodeIdentifier(request)
if nodeQueueInstance.IsNewNode(nodeIdentifier) {
logger.LoggerEnforcerXdsCallbacks.Infof("stream request on stream id: %d, from node: %s, version: %s",
id, nodeIdentifier, request.VersionInfo)
}
logger.LoggerEnforcerXdsCallbacks.Debugf("stream request on stream id: %d, from node: %s, version: %s, for type: %s",
logger.LoggerEnforcerXdsCallbacks.Infof("stream request on stream id: %d, from node: %s, version: %s, for type: %s",
id, nodeIdentifier, request.GetVersionInfo(), request.GetTypeUrl())
if request.ErrorDetail != nil {
logger.LoggerEnforcerXdsCallbacks.Errorf("Stream request for type %s on stream id: %d Error: %s", request.GetTypeUrl(),
Expand All @@ -82,7 +78,7 @@ func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryReque
func (cb *Callbacks) OnStreamResponse(context context.Context, id int64, request *discovery.DiscoveryRequest,
response *discovery.DiscoveryResponse) {
nodeIdentifier := common.GetNodeIdentifier(request)
logger.LoggerEnforcerXdsCallbacks.Debugf("stream response on stream id: %d node: %s for type: %s version: %s",
logger.LoggerEnforcerXdsCallbacks.Infof("stream response on stream id: %d node: %s for type: %s version: %s",
id, nodeIdentifier, request.GetTypeUrl(), response.GetVersionInfo())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
// OnStreamRequest prints debug logs
func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryRequest) error {
nodeIdentifier := common.GetNodeIdentifier(request)
if nodeQueueInstance.IsNewNode(nodeIdentifier) {
logger.LoggerRouterXdsCallbacks.Infof("stream request on stream id: %d, from node: %s, version: %s",
id, nodeIdentifier, request.VersionInfo)
}
logger.LoggerRouterXdsCallbacks.Debugf("stream request on stream id: %d, from node: %s, version: %s, for type: %s",
logger.LoggerRouterXdsCallbacks.Infof("stream request on stream id: %d, from node: %s, version: %s, for type: %s",
id, nodeIdentifier, request.VersionInfo, request.TypeUrl)
if request.ErrorDetail != nil {
logger.LoggerEnforcerXdsCallbacks.Errorf("Stream request for type %s on stream id: %d, from node: %s, Error: %s", request.GetTypeUrl(),
Expand All @@ -70,7 +66,7 @@ func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryReque
func (cb *Callbacks) OnStreamResponse(context context.Context, id int64, request *discovery.DiscoveryRequest,
response *discovery.DiscoveryResponse) {
nodeIdentifier := common.GetNodeIdentifier(request)
logger.LoggerRouterXdsCallbacks.Debugf("stream response on stream id: %d, to node: %s, version: %s, for type: %v", id,
logger.LoggerRouterXdsCallbacks.Infof("stream response on stream id: %d, to node: %s, version: %s, for type: %v", id,
nodeIdentifier, response.VersionInfo, response.TypeUrl)
}

Expand Down
6 changes: 3 additions & 3 deletions adapter/internal/discovery/xds/semantic_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func GetMajorMinorVersionRangeRegex(semVersion semantic_version.SemVersion) stri
majorVersion := strconv.Itoa(semVersion.Major)
minorVersion := strconv.Itoa(semVersion.Minor)
if semVersion.Patch == nil {
return "v" + majorVersion + "(\\." + minorVersion + ")?"
return "v" + majorVersion + "(?:\\." + minorVersion + ")?"
}
patchVersion := strconv.Itoa(*semVersion.Patch)
return "v" + majorVersion + "(\\." + minorVersion + "(\\." + patchVersion + ")?)?"
return "v" + majorVersion + "(?:\\." + minorVersion + "(?:\\." + patchVersion + ")?)?"
}

// GetMinorVersionRangeRegex generates minor version compatible range regex for the given version
Expand All @@ -51,7 +51,7 @@ func GetMinorVersionRangeRegex(semVersion semantic_version.SemVersion) string {
majorVersion := strconv.Itoa(semVersion.Major)
minorVersion := strconv.Itoa(semVersion.Minor)
patchVersion := strconv.Itoa(*semVersion.Patch)
return "v" + majorVersion + "\\." + minorVersion + "(\\." + patchVersion + ")?"
return "v" + majorVersion + "\\." + minorVersion + "(?:\\." + patchVersion + ")?"
}

// GetMajorVersionRange generates major version range for the given version
Expand Down
6 changes: 3 additions & 3 deletions adapter/internal/eventhub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func LoadSubscriptionData(configFile *config.Config, initialAPIUUIDListMap map[s
} else {
// Keep the iteration going on until a response is recieved.
logger.LoggerSync.Errorf("Error occurred while fetching data from control plane: %v", data.Error)
go func(d response) {
go func(d response, endpoint string, responseType interface{}) {
// Retry fetching from control plane after a configured time interval
if conf.ControlPlane.RetryInterval == 0 {
// Assign default retry interval
Expand All @@ -140,8 +140,8 @@ func LoadSubscriptionData(configFile *config.Config, initialAPIUUIDListMap map[s
logger.LoggerSync.Debugf("Time Duration for retrying: %v", conf.ControlPlane.RetryInterval*time.Second)
time.Sleep(conf.ControlPlane.RetryInterval * time.Second)
logger.LoggerSync.Infof("Retrying to fetch APIs from control plane. Time Duration for the next retry: %v", conf.ControlPlane.RetryInterval*time.Second)
go InvokeService(url.endpoint, url.responseType, nil, responseChannel, 0)
}(data)
go InvokeService(endpoint, responseType, nil, responseChannel, 0)
}(data, url.endpoint, url.responseType)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion adapter/internal/synchronizer/apis_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func FetchAPIsFromControlPlane(updatedAPIID string, updatedEnvs []string, envToD
} else {
// Keep the iteration still until all the envrionment response properly.
logger.LoggerSync.Errorf("Error occurred while fetching data from control plane for the API %q: %v. Hence retrying..", updatedAPIID, data.Err)
sync.RetryFetchingAPIs(c, data, sync.RuntimeArtifactEndpoint, true, queryParamMap)
sync.RetryFetchingAPIs(c, data, sync.RuntimeArtifactEndpoint, true, queryParamMap, nil)
retryCounter++
}
}
Expand Down
6 changes: 4 additions & 2 deletions adapter/pkg/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package health

import (
"context"
"sync"

healthservice "github.com/wso2/product-microgateway/adapter/pkg/health/api/wso2/health/service"
logger "github.com/wso2/product-microgateway/adapter/pkg/loggers"
"sync"
)

var (
Expand All @@ -35,7 +36,8 @@ var (

// Service components to be set health status
const (
RestService service = "adapter.internal.RestService"
RestService service = "adapter.internal.RestService"
RateLimiterGrpcService service = "adapter.internal.RateLimiterGrpcService"
)

type service string
Expand Down
4 changes: 2 additions & 2 deletions adapter/pkg/synchronizer/apis_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func ConstructControlPlaneRequest(id *string, gwLabel []string, controlPlanePara

// RetryFetchingAPIs function keeps retrying to fetch APIs from runtime-artifact endpoint.
func RetryFetchingAPIs(c chan SyncAPIResponse, data SyncAPIResponse, endpoint string, sendType bool,
queryParamMap map[string]string) {
queryParamMap map[string]string, apiUUIDList []string) {
retryInterval := workerPool.controlPlaneParams.retryInterval

// Retry fetching from control plane after a configured time interval
Expand All @@ -240,7 +240,7 @@ func RetryFetchingAPIs(c chan SyncAPIResponse, data SyncAPIResponse, endpoint st
logger.LoggerSync.Infof("Retrying to fetch API data from control plane for the API %q.", data.APIUUID)
channelFillPercentage := float64(len(workerPool.internalQueue)) / float64(cap(workerPool.internalQueue)) * 100
logger.LoggerSync.Infof("Workerpool channel size as a percentage is : %f", channelFillPercentage)
FetchAPIs(&data.APIUUID, data.GatewayLabels, c, endpoint, sendType, nil, queryParamMap)
FetchAPIs(&data.APIUUID, data.GatewayLabels, c, endpoint, sendType, apiUUIDList, queryParamMap)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public API getApi() {
tenantDomain == null ? APIConstants.SUPER_TENANT_DOMAIN_NAME : tenantDomain);
api.setOrganizationId(requestContext.getMatchedAPI().getOrganizationId());
api.setEnvironmentId(requestContext.getMatchedAPI().getEnvironmentId());
api.setApiContext(requestContext.getMatchedAPI().getBasePath());
return api;
}

Expand Down Expand Up @@ -204,9 +205,7 @@ public String getUserName() {

@Override
public String getUserAgentHeader() {
// User agent is not required for fault scenario
logger.error("Internal Error: User agent header is not required for fault events");
return null;
return requestContext.getHeaders().get("user-agent");
}

@Override
Expand All @@ -223,6 +222,7 @@ public Map<String, Object> getProperties() {
String deploymentType = requestContext.getMatchedAPI().getDeploymentType();
map.put(AnalyticsConstants.GATEWAY_URL, gwURL);
map.put(AnalyticsConstants.DEPLOYMENT_TYPE, deploymentType);
map.put(AnalyticsConstants.API_METHOD, requestContext.getRequestMethod());
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class EnvVarConfig {
private static final String DEFAULT_ADAPTER_HOST = "adapter";
private static final String DEFAULT_ADAPTER_XDS_PORT = "18000";
private static final String DEFAULT_ENFORCER_LABEL = "enforcer";
public static final String DEFAULT_XDS_MAX_MSG_SIZE = "4194304";
public static final String DEFAULT_XDS_MAX_MSG_SIZE = "41943040";
public static final String DEFAULT_XDS_MAX_RETRIES = Integer.toString(Constants.MAX_XDS_RETRIES);
public static final String DEFAULT_XDS_RETRY_PERIOD = Integer.toString(Constants.XDS_DEFAULT_RETRY);
public static final String DEFAULT_HOSTNAME = "Unassigned";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class AnalyticsConstants {

public static final String GATEWAY_URL = "x-original-gw-url";
public static final String DEPLOYMENT_TYPE = "deployment-type";
public static final String API_METHOD = "apiMethod";

public static final int API_THROTTLE_OUT_ERROR_CODE = 900800;
public static final int HARD_LIMIT_EXCEEDED_ERROR_CODE = 900801;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ public void watchApis() {
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamApis(new StreamObserver<>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("API event received with version : " + response.getVersionInfo());
logger.info("API event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received API discovery response " + response);
XdsSchedulerManager.getInstance().stopAPIDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public void run() {

public void watchApiList() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamApiList(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamApiList(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("API list event received with version : " + response.getVersionInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ public void run() {

public void watchApplications() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamApplications(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize)
.streamApplications(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Application creation event received with version : " + response.getVersionInfo());
logger.info("Application creation event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received Application discovery response " + response);
XdsSchedulerManager.getInstance().stopApplicationDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ public void run() {

public void watchApplicationKeyMappings() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamApplicationKeyMappings(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize)
.streamApplicationKeyMappings(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Application key generation event received with version : " + response.getVersionInfo());
logger.info("Application key generation event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received Application Key Mapping discovery response " + response);
XdsSchedulerManager.getInstance().stopApplicationKeyMappingDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ public void run() {

public void watchApplicationPolicies() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamApplicationPolicies(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize)
.streamApplicationPolicies(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Application policy event received with version : " + response.getVersionInfo());
logger.info("Application policy event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received Application Policy discovery response " + response);
XdsSchedulerManager.getInstance().stopApplicationPolicyDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,16 @@ public void run() {

public void watchKeyManagers() {
// TODO: implement a deadline with retries
reqObserver = stub.streamKeyManagers(new StreamObserver<>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamKeyManagers(new StreamObserver<>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Key manager event received with version : " + response.getVersionInfo());
logger.info("Key manager event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received KeyManagers discovery response " + response);
XdsSchedulerManager.getInstance().stopKeyManagerDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ public void watchRevokedTokens() {
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamTokens(new StreamObserver<>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Revoked token event received with version : " + response.getVersionInfo());
logger.info("Revoked token event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received revoked tokens response " + response);
XdsSchedulerManager.getInstance().stopRevokedTokenDiscoveryScheduling();
latestReceived = response;
Expand Down
Loading

0 comments on commit c910fe0

Please sign in to comment.