Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements related XDS communication between enforcer and adapter. #3447

Merged
merged 7 commits into from
Oct 17, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
// OnStreamRequest prints debug logs
func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryRequest) error {
nodeIdentifier := common.GetNodeIdentifier(request)
if nodeQueueInstance.IsNewNode(nodeIdentifier) {
logger.LoggerEnforcerXdsCallbacks.Infof("stream request on stream id: %d, from node: %s, version: %s",
id, nodeIdentifier, request.VersionInfo)
}
logger.LoggerEnforcerXdsCallbacks.Debugf("stream request on stream id: %d, from node: %s, version: %s, for type: %s",
logger.LoggerEnforcerXdsCallbacks.Infof("stream request on stream id: %d, from node: %s, version: %s, for type: %s",
id, nodeIdentifier, request.GetVersionInfo(), request.GetTypeUrl())
if request.ErrorDetail != nil {
logger.LoggerEnforcerXdsCallbacks.Errorf("Stream request for type %s on stream id: %d Error: %s", request.GetTypeUrl(),
Expand All @@ -82,7 +78,7 @@ func (cb *Callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryReque
func (cb *Callbacks) OnStreamResponse(context context.Context, id int64, request *discovery.DiscoveryRequest,
response *discovery.DiscoveryResponse) {
nodeIdentifier := common.GetNodeIdentifier(request)
logger.LoggerEnforcerXdsCallbacks.Debugf("stream response on stream id: %d node: %s for type: %s version: %s",
logger.LoggerEnforcerXdsCallbacks.Infof("stream response on stream id: %d node: %s for type: %s version: %s",
id, nodeIdentifier, request.GetTypeUrl(), response.GetVersionInfo())
}

Expand Down
6 changes: 3 additions & 3 deletions adapter/internal/eventhub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func LoadSubscriptionData(configFile *config.Config, initialAPIUUIDListMap map[s
} else {
// Keep the iteration going on until a response is recieved.
logger.LoggerSync.Errorf("Error occurred while fetching data from control plane: %v", data.Error)
go func(d response) {
go func(d response, endpoint string, responseType interface{}) {
// Retry fetching from control plane after a configured time interval
if conf.ControlPlane.RetryInterval == 0 {
// Assign default retry interval
Expand All @@ -140,8 +140,8 @@ func LoadSubscriptionData(configFile *config.Config, initialAPIUUIDListMap map[s
logger.LoggerSync.Debugf("Time Duration for retrying: %v", conf.ControlPlane.RetryInterval*time.Second)
time.Sleep(conf.ControlPlane.RetryInterval * time.Second)
logger.LoggerSync.Infof("Retrying to fetch APIs from control plane. Time Duration for the next retry: %v", conf.ControlPlane.RetryInterval*time.Second)
go InvokeService(url.endpoint, url.responseType, nil, responseChannel, 0)
}(data)
go InvokeService(endpoint, responseType, nil, responseChannel, 0)
}(data, url.endpoint, url.responseType)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ public void watchApis() {
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamApis(new StreamObserver<>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("API event received with version : " + response.getVersionInfo());
logger.info("API event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received API discovery response " + response);
XdsSchedulerManager.getInstance().stopAPIDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public void run() {

public void watchApiList() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamApiList(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamApiList(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("API list event received with version : " + response.getVersionInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ public void run() {

public void watchApplications() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamApplications(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize)
.streamApplications(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Application creation event received with version : " + response.getVersionInfo());
logger.info("Application creation event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received Application discovery response " + response);
XdsSchedulerManager.getInstance().stopApplicationDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ public void run() {

public void watchApplicationKeyMappings() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamApplicationKeyMappings(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize)
.streamApplicationKeyMappings(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Application key generation event received with version : " + response.getVersionInfo());
logger.info("Application key generation event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received Application Key Mapping discovery response " + response);
XdsSchedulerManager.getInstance().stopApplicationKeyMappingDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ public void run() {

public void watchApplicationPolicies() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamApplicationPolicies(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize)
.streamApplicationPolicies(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Application policy event received with version : " + response.getVersionInfo());
logger.info("Application policy event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received Application Policy discovery response " + response);
XdsSchedulerManager.getInstance().stopApplicationPolicyDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ public void watchKeyManagers() {
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamKeyManagers(new StreamObserver<>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Key manager event received with version : " + response.getVersionInfo());
logger.info("Key manager event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received KeyManagers discovery response " + response);
XdsSchedulerManager.getInstance().stopKeyManagerDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ public void watchRevokedTokens() {
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamTokens(new StreamObserver<>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Revoked token event received with version : " + response.getVersionInfo());
logger.info("Revoked token event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received revoked tokens response " + response);
XdsSchedulerManager.getInstance().stopRevokedTokenDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,17 @@ public void run() {

public void watchSubscriptions() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamSubscriptions(new StreamObserver<DiscoveryResponse>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize)
.streamSubscriptions(new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Subscription event received with version : " + response.getVersionInfo());
logger.info("Subscription event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received Subscription discovery response " + response);
XdsSchedulerManager.getInstance().stopSubscriptionDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,16 @@ public void run() {

public void watchSubscriptionPolicies() {
// TODO: (Praminda) implement a deadline with retries
reqObserver = stub.streamSubscriptionPolicies(new StreamObserver<>() {
int maxSize = Integer.parseInt(ConfigHolder.getInstance().getEnvVarConfig().getXdsMaxMsgSize());
reqObserver = stub.withMaxInboundMessageSize(maxSize).streamSubscriptionPolicies(new StreamObserver<>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Subscription policy event received with version : " + response.getVersionInfo());
logger.info("Subscription policy event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size for the type : " +
response.getTypeUrl());
}
logger.debug("Received Subscription policy discovery response " + response);
XdsSchedulerManager.getInstance().stopSubscriptionPolicyDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ public void watchThrottleData() {
.streamThrottleData(new StreamObserver<>() {
@Override
public void onNext(DiscoveryResponse response) {
logger.info("Throttle data event received with version : " + response.getVersionInfo());
logger.info("Throttle data event received with version : " + response.getVersionInfo() +
" and size(bytes) : " + response.getSerializedSize());
if ((double) response.getSerializedSize() / maxSize > 0.80) {
logger.error("Current response size exceeds 80% of the maximum message size " +
"for the type : " + response.getTypeUrl());
}
logger.debug("Received ThrottleData discovery response " + response);
XdsSchedulerManager.getInstance().stopThrottleDataDiscoveryScheduling();
latestReceived = response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.logging.log4j.Logger;
import org.wso2.choreo.connect.discovery.subscription.APIs;
import org.wso2.choreo.connect.enforcer.constants.APIConstants;
import org.wso2.choreo.connect.enforcer.discovery.ApiListDiscoveryClient;
import org.wso2.choreo.connect.enforcer.discovery.ApplicationDiscoveryClient;
import org.wso2.choreo.connect.enforcer.discovery.ApplicationKeyMappingDiscoveryClient;
import org.wso2.choreo.connect.enforcer.discovery.ApplicationPolicyDiscoveryClient;
Expand Down Expand Up @@ -151,7 +150,8 @@ public ApiPolicy getApiPolicyByName(String policyName) {
private void initializeLoadingTasks() {
SubscriptionDiscoveryClient.getInstance().watchSubscriptions();
ApplicationDiscoveryClient.getInstance().watchApplications();
ApiListDiscoveryClient.getInstance().watchApiList();
// Disabled API List discovery as it is not set as per current adapter implementation.
// ApiListDiscoveryClient.getInstance().watchApiList();
ApplicationPolicyDiscoveryClient.getInstance().watchApplicationPolicies();
SubscriptionPolicyDiscoveryClient.getInstance().watchSubscriptionPolicies();
ApplicationKeyMappingDiscoveryClient.getInstance().watchApplicationKeyMappings();
Expand Down
Loading