diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java index 94ded925d4..675cc3e360 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java @@ -30,6 +30,8 @@ import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQTask; import org.wso2.micro.integrator.mediation.ntask.NTaskTaskManager; +import java.util.Objects; + import static org.wso2.carbon.inbound.endpoint.common.Constants.SUPER_TENANT_DOMAIN_NAME; /** @@ -44,6 +46,7 @@ public abstract class InboundOneTimeTriggerRequestProcessor implements InboundRe protected SynapseEnvironment synapseEnvironment; protected String name; protected boolean coordination; + protected boolean startInPausedMode; private OneTimeTriggerInboundRunner inboundRunner; private Thread runningThread; @@ -134,4 +137,27 @@ public void destroy(boolean removeTask) { } } } + + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + if (Objects.nonNull(startUpController)) { + return !startUpController.isTaskActive(); + } else if (Objects.nonNull(runningThread)) { + return !runningThread.isAlive(); + } + return true; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRequestProcessorImpl.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRequestProcessorImpl.java index e9c1f35f09..910dbac30f 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRequestProcessorImpl.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRequestProcessorImpl.java @@ -27,6 +27,7 @@ import org.wso2.carbon.inbound.endpoint.persistence.InboundEndpointsDataStore; import org.wso2.carbon.inbound.endpoint.protocol.jms.JMSTask; import org.wso2.micro.integrator.mediation.ntask.NTaskTaskManager; +import org.wso2.micro.integrator.ntask.core.TaskUtils; import java.util.ArrayList; import java.util.HashMap; @@ -43,6 +44,7 @@ public abstract class InboundRequestProcessorImpl implements InboundRequestProce protected long interval; protected String name; protected boolean coordination; + protected boolean startInPausedMode; private List startUpControllersList = new ArrayList<>(); private HashMap inboundRunnersThreadsMap = new HashMap<>(); @@ -63,8 +65,8 @@ public InboundRequestProcessorImpl() { * @param endpointPostfix */ protected void start(InboundTask task, String endpointPostfix) { - log.info("Starting the inbound endpoint " + name + ", with coordination " + coordination + ". Interval : " - + interval + ". Type : " + endpointPostfix); + log.info("Starting the inbound endpoint [" + name + "]" + (startInPausedMode ? " in suspended mode" : "") + + ", with coordination " + coordination + ". Interval : " + interval + ". Type : " + endpointPostfix); if (coordination) { try { TaskDescription taskDescription = new TaskDescription(); @@ -78,6 +80,10 @@ protected void start(InboundTask task, String endpointPostfix) { taskDescription.setIntervalInMs(true); taskDescription.addResource(TaskDescription.INSTANCE, task); taskDescription.addResource(TaskDescription.CLASSNAME, task.getClass().getName()); + taskDescription.setTaskImplClassName(task.getClass().getName()); + taskDescription.addProperty(TaskUtils.TASK_OWNER_PROPERTY, TaskUtils.TASK_BELONGS_TO_INBOUND_ENDPOINT); + taskDescription.addProperty(TaskUtils.TASK_OWNER_NAME, name); + taskDescription.addProperty(TaskUtils.START_IN_PAUSED_MODE, String.valueOf(startInPausedMode)); StartUpController startUpController = new StartUpController(); startUpController.setTaskDescription(taskDescription); startUpController.init(synapseEnvironment); @@ -96,12 +102,22 @@ protected void start(InboundTask task, String endpointPostfix) { } } else { - startInboundRunnerThread(task, Constants.SUPER_TENANT_DOMAIN_NAME, false); + startInboundRunnerThread(task, Constants.SUPER_TENANT_DOMAIN_NAME, false, startInPausedMode); } } - private void startInboundRunnerThread(InboundTask task, String tenantDomain, boolean mgrOverride) { - InboundRunner inboundRunner = new InboundRunner(task, interval, tenantDomain, mgrOverride); + /** + * Starts a new thread to execute the given inbound task by creating a new {@link InboundRunner} instance + * and running it in a separate thread. + * + * @param task The inbound task to be executed by the thread. + * @param tenantDomain The tenant domain under which the task should be run. + * @param mgrOverride A flag indicating whether the manager override is enabled. + * @param startInPausedMode A flag indicating whether the task should start in paused mode. + */ + private void startInboundRunnerThread(InboundTask task, String tenantDomain, boolean mgrOverride, + boolean startInPausedMode) { + InboundRunner inboundRunner = new InboundRunner(task, interval, tenantDomain, mgrOverride, startInPausedMode); Thread runningThread = new Thread(inboundRunner); inboundRunnersThreadsMap.put(runningThread, inboundRunner); runningThread.start(); @@ -140,4 +156,105 @@ public void destroy() { } } + /** + * Activates the Inbound Endpoint by activating any associated startup controllers + * or resuming inbound runner threads if no startup controllers are present. + * + *

This method first checks if there are any startup controllers. If there are, it attempts to activate + * each controller and sets the success flag accordingly. If no startup controllers are present, it resumes + * any inbound runner threads that may be running. The method returns a boolean indicating whether + * the activation was successful.

+ * + * @return {@code true} if at least one associated startup controller was successfully activated or inbound runner + * threads were resumed; {@code false} if activation task failed for all the startup controllers or + * if no startup controllers or inbound runner threads present. + */ + @Override + public boolean activate() { + log.info("Activating the Inbound Endpoint [" + name + "]."); + + boolean isSuccessfullyActivated = false; + if (!startUpControllersList.isEmpty()) { + for (StartUpController sc : startUpControllersList) { + if (sc.activateTask()) { + isSuccessfullyActivated = true; + } else { + if (log.isDebugEnabled()) { + log.debug("Failed to activate the consumer: " + sc.getTaskDescription().getName()); + } + } + } + } else if (!inboundRunnersThreadsMap.isEmpty()) { + for (Map.Entry threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) { + InboundRunner inboundRunner = (InboundRunner) ((Map.Entry) threadInboundRunnerEntry).getValue(); + inboundRunner.resume(); + } + isSuccessfullyActivated = true; + } + return isSuccessfullyActivated; + } + + /** + * Deactivates the Inbound Endpoint by deactivating any associated startup controllers + * or pausing inbound runner threads if no startup controllers are present. + * + *

This method first checks if there are any startup controllers. If there are, it attempts to deactivate + * each controller and sets the success flag accordingly. If no startup controllers are present, it pauses + * any inbound runner threads that may be running. The method returns a boolean indicating whether + * the deactivation was successful.

+ * + * @return {@code true} if all associated startup controllers were successfully deactivated or inbound runner threads + * were paused; {@code false} if any deactivation task failed. + */ + @Override + public boolean deactivate() { + log.info("Deactivating the Inbound Endpoint [" + name + "]."); + + boolean isSuccessfullyDeactivated = true; + if (!startUpControllersList.isEmpty()) { + for (StartUpController sc : startUpControllersList) { + if (!sc.deactivateTask()) { + if (log.isDebugEnabled()) { + log.debug("Failed to deactivate the consumer: " + sc.getTaskDescription().getName()); + } + isSuccessfullyDeactivated = false; + } + } + } else if (!inboundRunnersThreadsMap.isEmpty()) { + for (Map.Entry threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) { + InboundRunner inboundRunner = (InboundRunner) ((Map.Entry) threadInboundRunnerEntry).getValue(); + inboundRunner.pause(); + } + } + return isSuccessfullyDeactivated; + } + + /** + * Checks if the Inbound Endpoint is deactivated. This method checks the status of any associated + * startup controllers or inbound runner threads. The endpoint is considered deactivated if all + * startup controllers are inactive and all inbound runner threads are paused. + * + * @return {@code true} if all startup controllers are inactive and all inbound runner threads are paused; + * {@code false} if any startup controller is active or any inbound runner thread is not paused. + */ + @Override + public boolean isDeactivated() { + if (!startUpControllersList.isEmpty()) { + for (StartUpController sc : startUpControllersList) { + if (sc.isTaskActive()) { + // Inbound Endpoint is considered active if at least one consumer is alive. + return false; + } + } + } else if (!inboundRunnersThreadsMap.isEmpty()) { + for (Map.Entry threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) { + InboundRunner inboundRunner = (InboundRunner) ((Map.Entry) threadInboundRunnerEntry).getValue(); + if (!inboundRunner.isPaused()) { + // Inbound Endpoint is considered active if at least one consumer is alive. + return false; + } + } + } + return true; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRunner.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRunner.java index 341fb0588c..8ffda4366f 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRunner.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRunner.java @@ -32,6 +32,7 @@ public class InboundRunner implements Runnable { private long interval; private volatile boolean execute = true; + private volatile boolean isPaused; private volatile boolean init = false; // Following will be used to calculate the sleeping interval private long lastRuntime; @@ -43,19 +44,57 @@ public class InboundRunner implements Runnable { private static final String CLUSTERING_PATTERN = "clusteringPattern"; private static final String CLUSTERING_PATTERN_WORKER_MANAGER = "WorkerManager"; private static final Log log = LogFactory.getLog(InboundRunner.class); + private final Object lock = new Object(); - public InboundRunner(InboundTask task, long interval, String tenantDomain, boolean mgrOverride) { + public InboundRunner(InboundTask task, long interval, String tenantDomain, boolean mgrOverride, boolean startInPausedMode) { this.task = task; this.interval = interval; this.tenantDomain = tenantDomain; this.runOnManagerOverride = mgrOverride; + this.isPaused = startInPausedMode; + } + + /** + * Pauses the execution of the thread. + *

+ * This method sets the {@code isPaused} flag to {@code true}, indicating that + * the thread should pause its execution. Threads can check this flag and + * enter a wait state if necessary. + *

+ */ + public void pause() { + synchronized (lock) { + isPaused = true; + } + } + + /** + * Resumes the execution of a paused thread. + *

+ * This method sets the {@code isPaused} flag to {@code false} and notifies + * all threads waiting on the {@code lock} object, allowing the thread to continue execution. + *

+ */ + public void resume() { + synchronized (lock) { + isPaused = false; + lock.notifyAll(); // Wake up the thread + } + } + + public boolean isPaused() { + return isPaused; } /** * Exit the running while loop and terminate the thread */ - protected void terminate() { - execute = false; + public void terminate() { + synchronized (lock) { + execute = false; + isPaused = false; // Ensure the thread is not stuck in pause + lock.notifyAll(); // Wake up the thread to exit + } } @Override @@ -65,7 +104,22 @@ public void run() { log.debug("Configuration context loaded. Running the Inbound Endpoint."); // Run the poll cycles while (execute) { - log.debug("Executing the Inbound Endpoint."); + synchronized (lock) { + while (isPaused && execute) { + try { + lock.wait(); // Pause the thread + } catch (InterruptedException e) { + if (log.isDebugEnabled()) { + log.debug("Inbound thread got interrupted while paused, but continuing..."); + } + } + } + } + if (!execute) break; // Exit right away if the thread is terminated + + if (log.isDebugEnabled()) { + log.debug("Executing the Inbound Endpoint."); + } lastRuntime = getTime(); try { task.taskExecute(); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/FilePollingConsumer.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/FilePollingConsumer.java index cf1032d152..1a65416c53 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/FilePollingConsumer.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/FilePollingConsumer.java @@ -1012,9 +1012,17 @@ protected Properties getInboundProperties() { void destroy() { fsManager.close(); + this.close(); + } + + void close() { isClosed = true; } + void start() { + isClosed = false; + } + private String sanitizeFileUriWithSub(String originalFileUri) { String[] splitUri = originalFileUri.split("\\?"); splitUri[0] = splitUri[0].substring(0, splitUri[0].length() - INCLUDE_SUB_DIR_SYMBOL_LENGTH); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/VFSProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/VFSProcessor.java index ab4efc2608..5be6b2c18d 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/VFSProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/VFSProcessor.java @@ -62,13 +62,15 @@ public VFSProcessor(InboundProcessorParams params) { this.injectingSeq = params.getInjectingSeq(); this.onErrorSeq = params.getOnErrorSeq(); this.synapseEnvironment = params.getSynapseEnvironment(); + this.startInPausedMode = params.startInPausedMode(); } /** * This will be called at the time of synapse artifact deployment. */ public void init() { - log.info("Inbound file listener " + name + " starting ..."); + log.info("Inbound file listener [" + name + "] is initializing" + + (this.startInPausedMode ? " but will remain in suspended mode..." : "...")); fileScanner = new FilePollingConsumer(vfsProperties, name, synapseEnvironment, interval); fileScanner.registerHandler( new FileInjectHandler(injectingSeq, onErrorSeq, sequential, synapseEnvironment, vfsProperties)); @@ -95,6 +97,18 @@ public void update() { // This will not be called for inbound endpoints } + @Override + public boolean deactivate() { + fileScanner.close(); + return super.deactivate(); + } + + @Override + public boolean activate() { + fileScanner.start(); + return super.activate(); + } + /** * Remove inbound endpoints. * diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java index 5419693dac..610e4cd300 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java @@ -38,6 +38,7 @@ public class GenericEventBasedListener extends InboundOneTimeTriggerEventBasedPr private String onErrorSeq; private String classImpl; private boolean sequential; + private boolean startInPausedMode; private static final Log log = LogFactory.getLog(GenericEventBasedListener.class); private static final String ENDPOINT_POSTFIX = "CLASS" + COMMON_ENDPOINT_POSTFIX; @@ -71,9 +72,24 @@ public GenericEventBasedListener(InboundProcessorParams params) { this.onErrorSeq = params.getOnErrorSeq(); this.synapseEnvironment = params.getSynapseEnvironment(); this.classImpl = params.getClassImpl(); + this.startInPausedMode = params.startInPausedMode(); } public void init() { + /* + * The activate/deactivate functionality is not currently implemented + * for this Inbound Endpoint type. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("Inbound event based listener " + name + " for class " + classImpl + " starting ..."); try { Class c = Class.forName(classImpl); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericInboundListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericInboundListener.java index b68f462692..be3e683d45 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericInboundListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericInboundListener.java @@ -95,4 +95,22 @@ protected static void handleException(String msg, Exception e) { log.error(msg, e); throw new SynapseException(msg, e); } + + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + return false; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java index 42b111d601..30cea4c8f7 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java @@ -76,9 +76,24 @@ public GenericProcessor(InboundProcessorParams params) { this.onErrorSeq = params.getOnErrorSeq(); this.synapseEnvironment = params.getSynapseEnvironment(); this.classImpl = params.getClassImpl(); + this.startInPausedMode = params.startInPausedMode(); } public void init() { + /* + * The activate/deactivate functionality is not currently implemented + * for this Inbound Endpoint type. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("Inbound listener " + name + " for class " + classImpl + " starting ..."); try { Class c = Class.forName(classImpl); @@ -138,4 +153,15 @@ public void update() { start(); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/grpc/InboundGRPCListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/grpc/InboundGRPCListener.java index 6b4c901815..9ba943b43f 100755 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/grpc/InboundGRPCListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/grpc/InboundGRPCListener.java @@ -31,13 +31,16 @@ import org.wso2.carbon.inbound.endpoint.protocol.grpc.util.Event; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.TimeUnit; public class InboundGRPCListener implements InboundRequestProcessor { private int port; + private String name; private GRPCInjectHandler injectHandler; private static final Log log = LogFactory.getLog(InboundGRPCListener.class.getName()); private Server server; + private boolean startInPausedMode; public InboundGRPCListener(InboundProcessorParams params) { String injectingSeq = params.getInjectingSeq(); @@ -51,11 +54,27 @@ public InboundGRPCListener(InboundProcessorParams params) { " property. Setting the port as " + InboundGRPCConstants.DEFAULT_INBOUND_ENDPOINT_GRPC_PORT); port = InboundGRPCConstants.DEFAULT_INBOUND_ENDPOINT_GRPC_PORT; } + name = params.getName(); injectHandler = new GRPCInjectHandler(injectingSeq, onErrorSeq, false, synapseEnvironment); + startInPausedMode = params.startInPausedMode(); } public void init() { try { + /* + * The activate/deactivate functionality for the GRPC protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for GRPC listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } this.start(); } catch (IOException e) { throw new SynapseException("IOException when starting gRPC server: " + e.getMessage(), e); @@ -70,6 +89,26 @@ public void destroy() { } } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + if (Objects.isNull(server)) { + return true; + } + return server.isTerminated(); + } + public void start() throws IOException { if (server != null) { throw new IllegalStateException("gRPC Listener Server already started"); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/hl7/core/InboundHL7Listener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/hl7/core/InboundHL7Listener.java index ce6ef34f3b..f35ee81045 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/hl7/core/InboundHL7Listener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/hl7/core/InboundHL7Listener.java @@ -32,13 +32,29 @@ public class InboundHL7Listener implements InboundRequestProcessor { private int port; private InboundProcessorParams params; + private boolean startInPausedMode; public InboundHL7Listener(InboundProcessorParams params) { this.params = params; + startInPausedMode = params.startInPausedMode(); } @Override public void init() { + /* + * The activate/deactivate functionality for the HL7 protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HL7 listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + params.getName() + "] is currently suspended."); + return; + } if (!InboundHL7IOReactor.isStarted()) { log.info("Starting MLLP Transport Reactor"); try { @@ -67,4 +83,21 @@ public void destroy() { HL7EndpointManager.getInstance().closeEndpoint(port); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + return !InboundHL7IOReactor.isStarted(); + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/InboundHttpListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/InboundHttpListener.java index 5bec1eb17e..230871843e 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/InboundHttpListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/InboundHttpListener.java @@ -44,6 +44,7 @@ public class InboundHttpListener implements InboundRequestProcessor { private String name; private int port; private InboundProcessorParams processorParams; + protected boolean startInPausedMode; public InboundHttpListener(InboundProcessorParams params) { processorParams = params; @@ -58,10 +59,25 @@ public InboundHttpListener(InboundProcessorParams params) { handleException("Please provide port number as integer instead of port " + portParam, e); } name = params.getName(); + startInPausedMode = params.startInPausedMode(); } @Override public void init() { + /* + * The activate/deactivate functionality for the HTTP protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HTTP listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } if (isPortUsedByAnotherApplication(port)) { log.warn("Port " + port + " used by inbound endpoint " + name + " is already used by another application " + "hence undeploying inbound endpoint"); @@ -77,6 +93,24 @@ public void destroy() { HTTPEndpointManager.getInstance().closeEndpoint(port); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + return !HTTPEndpointManager.getInstance().isEndpointRunning(name, port); + } + protected void handleException(String msg, Exception e) { log.error(msg, e); throw new SynapseException(msg, e); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/management/HTTPEndpointManager.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/management/HTTPEndpointManager.java index 93f779d276..4a9eb9f967 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/management/HTTPEndpointManager.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/management/HTTPEndpointManager.java @@ -481,4 +481,23 @@ public boolean isAnyInternalHttpApiEnabled() { public boolean isAnyInternalHttpsApiEnabled() { return internalHttpsApiEnabled; } + + /** + * Checks if a specified endpoint is running on a given port. + * This method retrieves the name of the endpoint listening on the given port and compares it + * with the provided name. If a match is found, it checks if the endpoint is actively running. + * + * @param name the name of the endpoint to check + * @param port the port number where the endpoint is expected to be running + * @return {@code true} if the endpoint with the specified name is running on the given port; + * {@code false} otherwise + */ + public boolean isEndpointRunning(String name, int port) { + + String epName = dataStore.getListeningEndpointName(port, SUPER_TENANT_DOMAIN_NAME); + if (epName != null && epName.equalsIgnoreCase(name)) { + return PassThroughInboundEndpointHandler.isEndpointRunning(port); + } + return false; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/https/InboundHttpsListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/https/InboundHttpsListener.java index 71c125722f..b28e0d6c25 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/https/InboundHttpsListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/https/InboundHttpsListener.java @@ -66,6 +66,20 @@ public InboundHttpsListener(InboundProcessorParams params) { @Override public void init() { + /* + * The activate/deactivate functionality for the HTTPS protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HTTPS listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } if (isPortUsedByAnotherApplication(port)) { log.warn("Port " + port + "used by inbound endpoint " + name + " is already used by another application " + "hence undeploying inbound endpoint"); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpssecurewebsocket/InboundHttpsSecureWebsocketListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpssecurewebsocket/InboundHttpsSecureWebsocketListener.java index ba59a1e124..f02cb9e44c 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpssecurewebsocket/InboundHttpsSecureWebsocketListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpssecurewebsocket/InboundHttpsSecureWebsocketListener.java @@ -17,12 +17,16 @@ */ package org.wso2.carbon.inbound.endpoint.protocol.httpssecurewebsocket; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.synapse.inbound.InboundProcessorParams; import org.wso2.carbon.inbound.endpoint.protocol.httpwebsocket.InboundHttpWebsocketListener; import org.wso2.carbon.inbound.endpoint.protocol.httpwebsocket.management.HttpWebsocketEndpointManager; public class InboundHttpsSecureWebsocketListener extends InboundHttpWebsocketListener { + private static final Log LOGGER = LogFactory.getLog(InboundHttpsSecureWebsocketListener.class); + public InboundHttpsSecureWebsocketListener(InboundProcessorParams params) { super(params); @@ -31,6 +35,20 @@ public InboundHttpsSecureWebsocketListener(InboundProcessorParams params) { @Override public void init() { - HttpWebsocketEndpointManager.getInstance().startSSLEndpoint(port, name, processorParams); + /* + * The activate/deactivate functionality for the HTTPS-WSS protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HTTPS-WSS listeners is implemented. + */ + if (startInPausedMode) { + LOGGER.info("Inbound endpoint [" + name + "] is currently suspended."); + } else { + HttpWebsocketEndpointManager.getInstance().startSSLEndpoint(port, name, processorParams); + } } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/InboundHttpWebsocketListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/InboundHttpWebsocketListener.java index 95d0c04068..de700dd88f 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/InboundHttpWebsocketListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/InboundHttpWebsocketListener.java @@ -31,6 +31,7 @@ public class InboundHttpWebsocketListener implements InboundRequestProcessor { protected final String name; protected int port; protected InboundProcessorParams processorParams; + protected boolean startInPausedMode; public InboundHttpWebsocketListener(InboundProcessorParams params) { @@ -43,12 +44,27 @@ public InboundHttpWebsocketListener(InboundProcessorParams params) { handleException("Validation failed for the port parameter " + portParam, e); } name = params.getName(); + startInPausedMode = params.startInPausedMode(); } @Override public void init() { - HttpWebsocketEndpointManager.getInstance().startEndpoint(port, name, processorParams); + /* + * The activate/deactivate functionality for the HTTP-WS protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HTTP-WS listener is implemented. + */ + if (startInPausedMode) { + LOGGER.info("Inbound endpoint [" + name + "] is currently suspended."); + } else { + HttpWebsocketEndpointManager.getInstance().startEndpoint(port, name, processorParams); + } } @Override @@ -57,6 +73,24 @@ public void destroy() { HttpWebsocketEndpointManager.getInstance().closeEndpoint(port); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + return !HttpWebsocketEndpointManager.getInstance().isEndpointRunning(name, port); + } + protected void handleException(String msg, Exception e) { LOGGER.error(msg, e); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/management/HttpWebsocketEndpointManager.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/management/HttpWebsocketEndpointManager.java index ec571eedb8..b2f99c89c2 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/management/HttpWebsocketEndpointManager.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/management/HttpWebsocketEndpointManager.java @@ -138,6 +138,11 @@ private boolean handleExistingEndpointOnSamePort(int port, String name) { } } + public boolean isEndpointRunning(String name, int port) { + String epName = dataStore.getListeningEndpointName(port, SUPER_TENANT_DOMAIN_NAME); + return epName.equalsIgnoreCase(name); + } + /** * Checks if the given port is available to use. * diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSProcessor.java index 5048b2300b..e236bb767c 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSProcessor.java @@ -46,6 +46,7 @@ public class JMSProcessor extends InboundRequestProcessorImpl implements TaskSta public JMSProcessor(InboundProcessorParams params) { this.name = params.getName(); + this.startInPausedMode = params.startInPausedMode(); this.jmsProperties = params.getProperties(); String inboundEndpointInterval = jmsProperties.getProperty(PollingConstants.INBOUND_ENDPOINT_INTERVAL); @@ -84,6 +85,19 @@ public JMSProcessor(InboundProcessorParams params) { * This will be called at the time of synapse artifact deployment. */ public void init() { + /* + * The activate/deactivate functionality for the JMS protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for JMS listener is implemented. + */ + if (startInPausedMode) { + return; + } log.info("Initializing inbound JMS listener for inbound endpoint " + name); for (int consumers = 0; consumers < concurrentConsumers; consumers++) { JMSPollingConsumer jmsPollingConsumer = new JMSPollingConsumer(jmsProperties, interval, name); @@ -136,4 +150,16 @@ public void destroy(boolean removeTask) { destroy(); } } + + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.java index c619fb6cc4..0fc7085bef 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.java @@ -157,4 +157,5 @@ public Object poll() { public Properties getInboundProperties() { return kafkaProperties; } + } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAProcessor.java index bb2af7b849..7cab410648 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAProcessor.java @@ -77,12 +77,26 @@ public KAFKAProcessor(InboundProcessorParams params) { this.injectingSeq = params.getInjectingSeq(); this.onErrorSeq = params.getOnErrorSeq(); this.synapseEnvironment = params.getSynapseEnvironment(); + this.startInPausedMode = params.startInPausedMode(); } /** * This will be called at the time of synapse artifact deployment. */ public void init() { + /* + * The activate/deactivate functionality for the Kafka Inbound Endpoint is not currently implemented. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for Kafka listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("Initializing inbound KAFKA listener for destination " + name); try { pollingConsumer = new KAFKAPollingConsumer(kafkaProperties, interval, name); @@ -136,6 +150,18 @@ public void destroy() { super.destroy(); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + /** * Remove inbound endpoints. * diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/mqtt/MqttListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/mqtt/MqttListener.java index 31849d2ea5..b0008b8d44 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/mqtt/MqttListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/mqtt/MqttListener.java @@ -77,6 +77,7 @@ public MqttListener(InboundProcessorParams params) { this.synapseEnvironment = params.getSynapseEnvironment(); this.mqttProperties = params.getProperties(); this.params = params; + this.startInPausedMode = params.startInPausedMode(); //assign default value if sequential mode parameter is not present this.sequential = true; @@ -153,6 +154,20 @@ public void destroy(boolean removeTask) { @Override public void init() { + /* + * The activate/deactivate functionality for the MQTT protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for MQTT listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("MQTT inbound endpoint " + name + " initializing ..."); initAsyncClient(); start(); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQListener.java index 68e7027780..9c8a29abaf 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQListener.java @@ -26,6 +26,7 @@ import org.wso2.carbon.inbound.endpoint.common.InboundOneTimeTriggerRequestProcessor; import org.wso2.carbon.inbound.endpoint.protocol.PollingConstants; +import java.util.Objects; import java.util.Properties; /** @@ -47,6 +48,7 @@ public RabbitMQListener(InboundProcessorParams params) { this.name = params.getName(); this.injectingSeq = params.getInjectingSeq(); this.onErrorSeq = params.getOnErrorSeq(); + this.startInPausedMode = params.startInPausedMode(); this.synapseEnvironment = params.getSynapseEnvironment(); this.rabbitmqProperties = params.getProperties(); @@ -72,12 +74,27 @@ public void destroy() { @Override public void destroy(boolean removeTask) { - rabbitMQConsumer.close(); + if (Objects.nonNull(rabbitMQConsumer)) { + rabbitMQConsumer.close(); + } super.destroy(removeTask); } @Override public void init() { + /* + * The activate/deactivate functionality for the RabbitMQ Inbound Endpoint is not currently implemented. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for RabbitMQ listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("RABBITMQ inbound endpoint " + name + " initializing ..."); rabbitMQConsumer = new RabbitMQConsumer(rabbitMQConnectionFactory, rabbitmqProperties, injectHandler); rabbitMQConsumer.setInboundName(name); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/securewebsocket/InboundSecureWebsocketListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/securewebsocket/InboundSecureWebsocketListener.java index 2ffa03c517..ee3825147d 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/securewebsocket/InboundSecureWebsocketListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/securewebsocket/InboundSecureWebsocketListener.java @@ -45,11 +45,24 @@ public InboundSecureWebsocketListener(InboundProcessorParams params) { handleException("Validation failed for the port parameter " + portParam, e); } name = params.getName(); - + this.startInPausedMode = params.startInPausedMode(); } @Override public void init() { + /* + * The activate/deactivate functionality for the WSS Inbound Endpoint is not currently implemented. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for WSS listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } int offsetPort = port + PersistenceUtils.getPortOffset(processorParams.getProperties()); WebsocketEndpointManager.getInstance().startSSLEndpoint(offsetPort, name, processorParams); } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketListener.java index a9429e74c7..1ae1170995 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketListener.java @@ -36,6 +36,7 @@ public class InboundWebsocketListener implements InboundRequestProcessor { private String name; private int port; private InboundProcessorParams processorParams; + protected boolean startInPausedMode; public InboundWebsocketListener(InboundProcessorParams params) { processorParams = params; @@ -47,10 +48,24 @@ public InboundWebsocketListener(InboundProcessorParams params) { handleException("Validation failed for the port parameter " + portParam, e); } name = params.getName(); + this.startInPausedMode = params.startInPausedMode(); } @Override public void init() { + /* + * The activate/deactivate functionality for the WS Inbound Endpoint is not currently implemented. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for WS listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } int offsetPort = port + PersistenceUtils.getPortOffset(processorParams.getProperties()); WebsocketEndpointManager.getInstance().startEndpoint(offsetPort, name, processorParams); } @@ -62,6 +77,22 @@ public void destroy() { WebsocketEndpointManager.getInstance().closeEndpoint(offsetPort); } + @Override + public boolean activate() { + return false; + } + + @Override + public boolean deactivate() { + return false; + } + + @Override + public boolean isDeactivated() { + + return !WebsocketEndpointManager.getInstance().isEndpointRunning(name, port); + } + protected void handleException(String msg, Exception e) { log.error(msg, e); throw new SynapseException(msg, e); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketEndpointManager.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketEndpointManager.java index 8d81c4c079..dfda28b30c 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketEndpointManager.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketEndpointManager.java @@ -309,4 +309,13 @@ public InboundWebsocketSourceHandler getSourceHandler() { public void setSourceHandler(InboundWebsocketSourceHandler sourceHandler) { this.sourceHandler = sourceHandler; } + + public boolean isEndpointRunning(String name, int port) { + + String epName = dataStore.getListeningEndpointName(port, SUPER_TENANT_DOMAIN_NAME); + if (epName != null && epName.equalsIgnoreCase(name)) { + return WebsocketEventExecutorManager.getInstance().isRegisteredExecutor(port); + } + return false; + } } diff --git a/components/mediation/tasks/org.wso2.micro.integrator.mediation.ntask/src/main/java/org/wso2/micro/integrator/mediation/ntask/NTaskTaskManager.java b/components/mediation/tasks/org.wso2.micro.integrator.mediation.ntask/src/main/java/org/wso2/micro/integrator/mediation/ntask/NTaskTaskManager.java index ec879d84ef..bbb05fc89d 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.mediation.ntask/src/main/java/org/wso2/micro/integrator/mediation/ntask/NTaskTaskManager.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.mediation.ntask/src/main/java/org/wso2/micro/integrator/mediation/ntask/NTaskTaskManager.java @@ -26,6 +26,7 @@ import org.wso2.micro.core.ServerStartupHandler; import org.wso2.micro.integrator.mediation.ntask.internal.NtaskService; import org.wso2.micro.integrator.ntask.core.TaskInfo; +import org.wso2.micro.integrator.ntask.core.TaskUtils; import org.wso2.micro.integrator.ntask.core.impl.LocalTaskActionListener; import org.wso2.micro.integrator.ntask.core.service.TaskService; @@ -34,6 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -104,7 +106,10 @@ public boolean schedule(TaskDescription taskDescription) { if (logger.isDebugEnabled()) { logger.debug("Submitting task [ " + taskId(taskDescription) + " ] to the task manager."); } - taskManager.handleTask(taskInfo.getName()); + boolean scheduledInPausedMode = + Objects.nonNull(taskDescription.getProperty(TaskUtils.START_IN_PAUSED_MODE)) + && Boolean.parseBoolean((String) taskDescription.getProperty(TaskUtils.START_IN_PAUSED_MODE)); + taskManager.handleTask(taskInfo.getName(), scheduledInPausedMode); } removeTask(taskDescription); } @@ -624,8 +629,7 @@ private boolean checkTaskRunning(String taskName) { return false; } try { - return taskManager.getTaskState(taskName) - .equals(org.wso2.micro.integrator.ntask.core.TaskManager.TaskState.NORMAL); + return taskManager.isTaskRunning(taskName); } catch (Exception e) { logger.error("Cannot return task status [" + taskName + "]. Error: " + e.getLocalizedMessage(), e); diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java index 2feac69f5b..f1f3fe3679 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java @@ -22,7 +22,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.synapse.commons.util.MiscellaneousUtil; import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.inbound.InboundEndpoint; import org.apache.synapse.message.processor.MessageProcessor; +import org.apache.synapse.task.TaskDescription; +import org.apache.synapse.task.TaskDescriptionRepository; import org.wso2.micro.integrator.coordination.ClusterCoordinator; import org.wso2.micro.integrator.core.util.MicroIntegratorBaseUtils; import org.wso2.micro.integrator.ntask.common.TaskException; @@ -32,13 +35,13 @@ import org.wso2.micro.integrator.ntask.coordination.task.resolver.TaskLocationResolver; import org.wso2.micro.integrator.ntask.coordination.task.store.TaskStore; import org.wso2.micro.integrator.ntask.coordination.task.store.cleaner.TaskStoreCleaner; +import org.wso2.micro.integrator.ntask.core.TaskUtils; import org.wso2.micro.integrator.ntask.core.impl.standalone.ScheduledTaskManager; import org.wso2.micro.integrator.ntask.core.internal.DataHolder; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -162,7 +165,7 @@ private void pauseDeactivatedTasks() throws TaskCoordinationException { + "in this node or an invalid entry, hence ignoring it."); } }); - cleanUpMessageProcessors(pausedTasks); + notifyOnPause(pausedTasks); taskStore.updateTaskState(pausedTasks, CoordinatedTask.States.PAUSED); } @@ -173,13 +176,13 @@ private void pauseDeactivatedTasks() throws TaskCoordinationException { */ private void addFailedTasks() throws TaskCoordinationException { - List failedTasks = taskManager.getAdditionFailedTasks(); + List failedTasks = taskManager.getAdditionFailedTasks(); if (LOG.isDebugEnabled()) { LOG.debug("Following list of tasks were found in the failed list."); failedTasks.forEach(LOG::debug); } - for (String task : failedTasks) { - taskStore.addTaskIfNotExist(task); + for (ScheduledTaskManager.TaskEntry task : failedTasks) { + taskStore.addTaskIfNotExist(task.getName(), task.getState()); taskManager.removeTaskFromAdditionFailedTaskList(task); if (LOG.isDebugEnabled()) { LOG.debug("Successfully added the failed task [" + task + "]"); @@ -251,21 +254,36 @@ public synchronized void resolveUnassignedNotCompletedTasksAndUpdateStore() thro taskStore.updateAssignmentAndState(tasksToBeUpdated); } - private void cleanUpMessageProcessors(List pausedTasks) { - + private void notifyOnPause(List pausedTasks) { Set completedProcessors = new HashSet(); + Set completedInboundEndpoints = new HashSet(); + SynapseEnvironment synapseEnvironment = MicroIntegratorBaseUtils.getSynapseEnvironment(); + TaskDescriptionRepository taskRepo = synapseEnvironment.getTaskManager().getTaskDescriptionRepository(); pausedTasks.forEach(task -> { if (MiscellaneousUtil.isTaskOfMessageProcessor(task)) { String messageProcessorName = MiscellaneousUtil.getMessageProcessorName(task); if (!completedProcessors.contains(messageProcessorName)) { - SynapseEnvironment synapseEnvironment = MicroIntegratorBaseUtils.getSynapseEnvironment(); - MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration().getMessageProcessors(). - get(messageProcessorName); + MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration() + .getMessageProcessors().get(messageProcessorName); if (messageProcessor != null) { messageProcessor.cleanUpDeactivatedProcessors(); } completedProcessors.add(messageProcessorName); } + } else { + TaskDescription taskDescription = taskRepo.getTaskDescription(task); + if (taskDescription.getProperty(TaskUtils.TASK_OWNER_PROPERTY) == TaskUtils.TASK_BELONGS_TO_INBOUND_ENDPOINT) { + String inboundEndpointName = (String) taskDescription.getProperty(TaskUtils.TASK_OWNER_NAME); + + if (!completedInboundEndpoints.contains(inboundEndpointName)) { + InboundEndpoint inboundEndpoint = synapseEnvironment.getSynapseConfiguration() + .getInboundEndpoint(inboundEndpointName); + if (inboundEndpoint != null) { + inboundEndpoint.updateInboundEndpointState(true); + } + completedInboundEndpoints.add(inboundEndpointName); + } + } } }); } diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/TaskStore.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/TaskStore.java index b432f01e26..575367dedf 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/TaskStore.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/TaskStore.java @@ -141,7 +141,18 @@ public List getAllAssignedIncompleteTasks() throws TaskCoordina */ public void addTaskIfNotExist(String task) throws TaskCoordinationException { - rdmbsConnector.addTaskIfNotExist(task); + rdmbsConnector.addTaskIfNotExist(task, CoordinatedTask.States.NONE); + } + + /** + * Add the task. + * + * @param task - The coordinated task which needs to be added. + * @param state - Initial state of the task. + */ + public void addTaskIfNotExist(String task, CoordinatedTask.States state) throws TaskCoordinationException { + + rdmbsConnector.addTaskIfNotExist(task, state); } /** diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/RDMBSConnector.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/RDMBSConnector.java index bbeb8c2a4e..f4dc0d7f1a 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/RDMBSConnector.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/RDMBSConnector.java @@ -357,11 +357,12 @@ public List getAllAssignedIncompleteTasks() throws TaskCoordina * * @param taskName - The task which needs to be added. */ - public void addTaskIfNotExist(String taskName) throws TaskCoordinationException { + public void addTaskIfNotExist(String taskName, CoordinatedTask.States state) throws TaskCoordinationException { try (Connection connection = getConnection(); PreparedStatement preparedStatement = connection.prepareStatement( ADD_TASK)) { preparedStatement.setString(1, taskName); + preparedStatement.setString(2, state.name()); preparedStatement.executeUpdate(); if (LOG.isDebugEnabled()) { LOG.debug("Successfully added the task [" + taskName + "]."); diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/TaskQueryHelper.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/TaskQueryHelper.java index 518455e087..90db32b0f4 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/TaskQueryHelper.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/TaskQueryHelper.java @@ -40,7 +40,7 @@ public class TaskQueryHelper { static final String ADD_TASK = "INSERT INTO " + TABLE_NAME + " ( " + TASK_NAME + ", " + DESTINED_NODE_ID + ", " + TASK_STATE + ") " - + "VALUES (?,NULL,'" + CoordinatedTask.States.NONE + "')"; + + "VALUES (?,NULL,?)"; static final String UPDATE_ASSIGNMENT_AND_STATE = "UPDATE " + TABLE_NAME + " SET " + DESTINED_NODE_ID + " = ? , " + TASK_STATE + " = " + TASK_STATE_CONST diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskManager.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskManager.java index 7bd4c37616..3f7457cd46 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskManager.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskManager.java @@ -60,6 +60,8 @@ public interface TaskManager { */ void handleTask(String taskName) throws TaskException; + void handleTask(String taskName, boolean scheduledInPausedMode) throws TaskException; + /** * Get all the coordinated tasks ( the tasks which need db interaction ) deployed in this node. * @@ -111,6 +113,8 @@ public interface TaskManager { boolean isDeactivated(String taskName) throws TaskException; + boolean isTaskRunning(String taskName) throws TaskException; + /** * Get task information. * diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskUtils.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskUtils.java index 10be00225e..6ae2dcfb8c 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskUtils.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskUtils.java @@ -45,6 +45,14 @@ public class TaskUtils { public static final String TASK_STATE_PROPERTY = "TASK_STATE_PROPERTY"; + public static final String TASK_OWNER_PROPERTY = "taskOwner"; + + public static final String TASK_OWNER_NAME = "taskOwnerName"; + + public static final String TASK_BELONGS_TO_INBOUND_ENDPOINT = "InboundEndpoint"; + + public static final String START_IN_PAUSED_MODE = "startInPausedMode"; + private static SecretResolver secretResolver; public static Document convertToDocument(File file) throws TaskException { diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java index da08974054..e4fc9f8a90 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java @@ -208,7 +208,7 @@ protected synchronized void scheduleLocalTask(String taskName) throws TaskExcept this.scheduleLocalTask(taskName, paused); } - private synchronized void scheduleLocalTask(String taskName, boolean paused) throws TaskException { + protected synchronized void scheduleLocalTask(String taskName, boolean paused) throws TaskException { TaskInfo taskInfo = this.getTaskRepository().getTask(taskName); String taskGroup = this.getTenantTaskGroup(); diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/standalone/ScheduledTaskManager.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/standalone/ScheduledTaskManager.java index 6df06888d6..ff30cd0490 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/standalone/ScheduledTaskManager.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/standalone/ScheduledTaskManager.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.synapse.commons.util.MiscellaneousUtil; import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.inbound.InboundEndpoint; import org.apache.synapse.message.processor.MessageProcessor; import org.apache.synapse.task.TaskDescription; import org.wso2.micro.integrator.core.util.MicroIntegratorBaseUtils; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; /** * This class is responsible for handling / scheduling all tasks in Micro Integrator. @@ -53,7 +55,7 @@ public class ScheduledTaskManager extends AbstractQuartzTaskManager { /** * The list of tasks for which the addition failed. */ - private List additionFailedTasks = new ArrayList<>(); + private List additionFailedTasks = new ArrayList<>(); private List locallyRunningCoordinatedTasks = new ArrayList<>(); @@ -85,27 +87,41 @@ private boolean isMyTaskTypeRegistered() { */ public void handleTask(String taskName) throws TaskException { + handleTask(taskName, false); + } + + public void handleTask(String taskName, boolean scheduledInPausedMode) throws TaskException { if (isCoordinatedTask(taskName)) { if (log.isDebugEnabled()) { log.debug("Adding task [" + taskName + "] to the data base since this is a coordinated task."); } deployedCoordinatedTasks.add(taskName); + CoordinatedTask.States state; + if (scheduledInPausedMode) { + state = CoordinatedTask.States.PAUSED; + } else { + state = CoordinatedTask.States.NONE; + } try { - taskStore.addTaskIfNotExist(taskName); + taskStore.addTaskIfNotExist(taskName, state); } catch (TaskCoordinationException ex) { - additionFailedTasks.add(taskName); + additionFailedTasks.add(new TaskEntry(taskName, state)); throw new TaskException("Error adding task : " + taskName, TaskException.Code.DATABASE_ERROR, ex); } return; } - scheduleTask(taskName); + if (scheduledInPausedMode) { + scheduleTaskInPausedMode(taskName); + } else { + scheduleTask(taskName); + } } - public List getAdditionFailedTasks() { + public List getAdditionFailedTasks() { return new ArrayList<>(additionFailedTasks); } - public void removeTaskFromAdditionFailedTaskList(String taskName) { + public void removeTaskFromAdditionFailedTaskList(TaskEntry taskName) { additionFailedTasks.remove(taskName); } @@ -164,17 +180,11 @@ public void scheduleCoordinatedTask(String taskName) throws TaskException { try { if (taskStore.updateTaskState(taskName, CoordinatedTask.States.RUNNING, localNodeId)) { if (!isPreviouslyScheduled(taskName, getTenantTaskGroup())) { + // update the task repo to remove pause scheduleTask(taskName); } else { resumeLocalTask(taskName); - if (MiscellaneousUtil.isTaskOfMessageProcessor(taskName)) { - String messageProcessorName = MiscellaneousUtil.getMessageProcessorName(taskName); - MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration() - .getMessageProcessors().get(messageProcessorName); - if (messageProcessor != null) { - messageProcessor.resumeRemotely(); - } - } + notifyOnResume(taskName); } locallyRunningCoordinatedTasks.add(taskName); } else { @@ -188,6 +198,38 @@ public void scheduleCoordinatedTask(String taskName) throws TaskException { } } + /** + * Notifies the relevant components to resume operations associated with the specified task. + * + *

This method checks whether the given task is associated with a Message Processor or + * an Inbound Endpoint and triggers the necessary action to resume the respective component. + * If the task belongs to a Message Processor, the associated Message Processor is resumed + * remotely. If the task belongs to an Inbound Endpoint, its state is updated to active. + * + * @param taskName the name of the task to be resumed + */ + private void notifyOnResume(String taskName) { + if (MiscellaneousUtil.isTaskOfMessageProcessor(taskName)) { + String messageProcessorName = MiscellaneousUtil.getMessageProcessorName(taskName); + MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration() + .getMessageProcessors().get(messageProcessorName); + if (messageProcessor != null) { + messageProcessor.resumeRemotely(); + } + return; + } + TaskDescription taskDescription = + synapseEnvironment.getTaskManager().getTaskDescriptionRepository().getTaskDescription(taskName); + if (taskDescription.getProperty(TaskUtils.TASK_OWNER_PROPERTY) + == TaskUtils.TASK_BELONGS_TO_INBOUND_ENDPOINT) { + InboundEndpoint inboundEndpoint = synapseEnvironment.getSynapseConfiguration() + .getInboundEndpoint((String) taskDescription.getProperty(TaskUtils.TASK_OWNER_NAME)); + if (Objects.nonNull(inboundEndpoint)) { + inboundEndpoint.updateInboundEndpointState(false); + } + } + } + public List getLocallyRunningCoordinatedTasks() { return new ArrayList<>(locallyRunningCoordinatedTasks); } @@ -233,6 +275,16 @@ private void scheduleTask(String taskName) throws TaskException { } } + private void scheduleTaskInPausedMode(String taskName) throws TaskException { + if (this.isMyTaskTypeRegistered()) { + this.scheduleLocalTask(taskName, true); + } else { + throw new TaskException( + "Task type: '" + this.getTaskType() + "' is not registered in the current task node", + TaskException.Code.TASK_NODE_NOT_AVAILABLE); + } + } + @Override public boolean deleteTask(String taskName) throws TaskException { @@ -296,7 +348,20 @@ public boolean isDeactivated(String taskName) throws TaskException { } return isDeactivated; } - return getTaskState(taskName).equals(TaskState.PAUSED); + return !(getTaskState(taskName).equals(TaskState.NORMAL) || getTaskState(taskName).equals(TaskState.BLOCKED)); + } + + @Override + public boolean isTaskRunning(String taskName) throws TaskException { + + if (deployedCoordinatedTasks.contains(taskName)) { + boolean isRunning = CoordinatedTask.States.RUNNING.equals( getCoordinatedTaskState(taskName)); + if (log.isDebugEnabled()) { + log.debug("Task [" + taskName + "] is " + (isRunning ? "" : "not") + " in running state."); + } + return isRunning; + } + return getTaskState(taskName).equals(TaskState.NORMAL); } private CoordinatedTask.States getCoordinatedTaskState(String taskName) { @@ -361,4 +426,24 @@ private void resumeTask(String taskName) throws TaskException { TaskUtils.setTaskPaused(this.getTaskRepository(), taskName, false); } + public static class TaskEntry { + private String name; + private CoordinatedTask.States state; + + public TaskEntry(String name, CoordinatedTask.States state) { + this.name = name; + this.state = state; + } + + public String getName() { + + return name; + } + + public CoordinatedTask.States getState() { + + return state; + } + } + } diff --git a/components/org.wso2.micro.integrator.core/src/main/java/org/wso2/micro/application/deployer/CarbonApplication.java b/components/org.wso2.micro.integrator.core/src/main/java/org/wso2/micro/application/deployer/CarbonApplication.java index 652ac0bc70..c9e00c20e2 100644 --- a/components/org.wso2.micro.integrator.core/src/main/java/org/wso2/micro/application/deployer/CarbonApplication.java +++ b/components/org.wso2.micro.integrator.core/src/main/java/org/wso2/micro/application/deployer/CarbonApplication.java @@ -32,6 +32,7 @@ public class CarbonApplication { private String appVersion; private boolean deploymentCompleted; private String mainSequence; + private ClassLoader classLoader; private ApplicationConfiguration appConfig; @@ -109,5 +110,13 @@ public String getMainSequence() { public void setMainSequence(String mainSequence) { this.mainSequence = mainSequence; } + + public ClassLoader getClassLoader() { + return classLoader; + } + + public void setClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + } } diff --git a/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/Constants.java b/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/Constants.java index de916b2400..867a29b129 100644 --- a/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/Constants.java +++ b/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/Constants.java @@ -163,6 +163,7 @@ public class Constants { public static final String AUDIT_LOG_TYPE_LOG_LEVEL = "log_level"; public static final String AUDIT_LOG_TYPE_ROOT_LOG_LEVEL = "root_log_level"; public static final String AUDIT_LOG_TYPE_MESSAGE_PROCESSOR = "message_processor"; + public static final String AUDIT_LOG_TYPE_INBOUND_ENDPOINT = "inbound_endpoint"; public static final String AUDIT_LOG_TYPE_CARBON_APPLICATION = "carbon_application"; public static final String AUDIT_LOG_TYPE_CONNECTOR = "connector"; diff --git a/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/InboundEndpointResource.java b/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/InboundEndpointResource.java index a7d7f9065c..8c42cf4701 100644 --- a/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/InboundEndpointResource.java +++ b/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/InboundEndpointResource.java @@ -32,6 +32,7 @@ import org.wso2.carbon.inbound.endpoint.internal.http.api.APIResource; import org.wso2.micro.integrator.management.apis.security.handler.SecurityUtils; import org.wso2.micro.integrator.security.user.api.UserStoreException; +import org.wso2.micro.core.util.AuditLogger; import java.io.IOException; @@ -43,7 +44,11 @@ import java.util.Objects; import java.util.stream.Collectors; +import static org.wso2.micro.integrator.management.apis.Constants.ACTIVE_STATUS; +import static org.wso2.micro.integrator.management.apis.Constants.INACTIVE_STATUS; +import static org.wso2.micro.integrator.management.apis.Constants.NAME; import static org.wso2.micro.integrator.management.apis.Constants.SEARCH_KEY; +import static org.wso2.micro.integrator.management.apis.Constants.STATUS; import static org.wso2.micro.integrator.management.apis.Constants.SYNAPSE_CONFIGURATION; import static org.wso2.micro.integrator.management.apis.Constants.USERNAME_PROPERTY; @@ -129,9 +134,14 @@ private void handlePost(MessageContext msgCtx, } JSONObject info = new JSONObject(); info.put(INBOUND_ENDPOINT_NAME, inboundName); - response = Utils.handleTracing(performedBy, Constants.AUDIT_LOG_TYPE_INBOUND_ENDPOINT_TRACE, - Constants.INBOUND_ENDPOINTS, info, - inboundEndpoint.getAspectConfiguration(), inboundName, axisMsgCtx); + if (payload.has(STATUS)) { + response = handleStatusUpdate(inboundEndpoint, performedBy, info, msgCtx, payload); + } else { + response = Utils.handleTracing(performedBy, Constants.AUDIT_LOG_TYPE_INBOUND_ENDPOINT_TRACE, + Constants.INBOUND_ENDPOINTS, info, + inboundEndpoint.getAspectConfiguration(), inboundName, axisMsgCtx); + } + } else { response = Utils.createJsonError("Specified inbound endpoint ('" + inboundName + "') not found", axisMsgCtx, Constants.BAD_REQUEST); @@ -165,6 +175,7 @@ private void setResponseBody(Collection inboundEndpointCollecti inboundObject.put(Constants.NAME, inboundEndpoint.getName()); inboundObject.put("protocol", inboundEndpoint.getProtocol()); + inboundObject.put(Constants.STATUS, getInboundEndpointState(inboundEndpoint)); jsonBody.getJSONArray(Constants.LIST).put(inboundObject); } @@ -204,6 +215,7 @@ private JSONObject convertInboundEndpointToJsonObject(InboundEndpoint inboundEnd inboundObject.put("protocol", inboundEndpoint.getProtocol()); inboundObject.put("sequence", inboundEndpoint.getInjectingSeq()); inboundObject.put("error", inboundEndpoint.getOnErrorSeq()); + inboundObject.put(Constants.STATUS, getInboundEndpointState(inboundEndpoint)); String statisticState = inboundEndpoint.getAspectConfiguration().isStatisticsEnable() ? Constants.ENABLED : Constants.DISABLED; inboundObject.put(Constants.STATS, statisticState); @@ -229,4 +241,71 @@ private JSONObject convertInboundEndpointToJsonObject(InboundEndpoint inboundEnd } return inboundObject; } + + /** + * Determines the current state of the specified inbound endpoint. + * + * @param inboundEndpoint The {@link InboundEndpoint} instance whose state is to be retrieved. + * @return A {@link String} representing the state of the inbound endpoint: + * - {@code INACTIVE_STATUS} if the inbound endpoint is deactivated. + * - {@code ACTIVE_STATUS} otherwise. + */ + private String getInboundEndpointState(InboundEndpoint inboundEndpoint) { + if (inboundEndpoint.isDeactivated()) { + return INACTIVE_STATUS; + } + return ACTIVE_STATUS; + } + + /** + * Handles the activation or deactivation of an inbound endpoint based on the provided status. + * + * @param performedBy The user performing the operation, used for audit logging. + * @param info A JSON object containing additional audit information. + * @param messageContext The current Synapse {@link MessageContext} for accessing the configuration. + * @param payload A {@link JsonObject} containing the inbound endpoint name and desired status. + * + * @return A {@link JSONObject} indicating the result of the operation. If successful, contains + * a confirmation message. If unsuccessful, contains an error message with appropriate + * HTTP error codes. + */ + private JSONObject handleStatusUpdate(InboundEndpoint inboundEndpoint, String performedBy, JSONObject info, + MessageContext messageContext, JsonObject payload) { + String name = payload.get(NAME).getAsString(); + String status = payload.get(STATUS).getAsString(); + + org.apache.axis2.context.MessageContext axis2MessageContext = + ((Axis2MessageContext) messageContext).getAxis2MessageContext(); + JSONObject jsonResponse = new JSONObject(); + if (inboundEndpoint == null) { + return Utils.createJsonError("Inbound Endpoint could not be found", + axis2MessageContext, Constants.NOT_FOUND); + } + + if (INACTIVE_STATUS.equalsIgnoreCase(status)) { + boolean success = inboundEndpoint.deactivate(); + if (success) { + jsonResponse.put(Constants.MESSAGE_JSON_ATTRIBUTE, name + " : is deactivated"); + AuditLogger.logAuditMessage(performedBy, Constants.AUDIT_LOG_TYPE_INBOUND_ENDPOINT, + Constants.AUDIT_LOG_ACTION_DISABLED, info); + } else { + jsonResponse = Utils.createJsonError("Failed to deactivate the inbound endpoint : " + name, + axis2MessageContext, Constants.INTERNAL_SERVER_ERROR); + } + } else if (ACTIVE_STATUS.equalsIgnoreCase(status)) { + boolean success = inboundEndpoint.activate(); + if (success) { + jsonResponse.put(Constants.MESSAGE_JSON_ATTRIBUTE, name + " : is activated"); + AuditLogger.logAuditMessage(performedBy, Constants.AUDIT_LOG_TYPE_MESSAGE_PROCESSOR, + Constants.AUDIT_LOG_ACTION_ENABLE, info); + } else { + jsonResponse = Utils.createJsonError("Failed to activate the inbound endpoint : " + name, + axis2MessageContext, Constants.INTERNAL_SERVER_ERROR); + } + } else { + jsonResponse = Utils.createJsonError("Provided state is not valid", axis2MessageContext, Constants.BAD_REQUEST); + } + + return jsonResponse; + } } diff --git a/components/org.wso2.micro.integrator.initializer/src/main/java/org/wso2/micro/integrator/initializer/deployment/synapse/deployer/SynapseAppDeployer.java b/components/org.wso2.micro.integrator.initializer/src/main/java/org/wso2/micro/integrator/initializer/deployment/synapse/deployer/SynapseAppDeployer.java index 4e17173cf2..cf21f8cf28 100644 --- a/components/org.wso2.micro.integrator.initializer/src/main/java/org/wso2/micro/integrator/initializer/deployment/synapse/deployer/SynapseAppDeployer.java +++ b/components/org.wso2.micro.integrator.initializer/src/main/java/org/wso2/micro/integrator/initializer/deployment/synapse/deployer/SynapseAppDeployer.java @@ -129,8 +129,8 @@ public void deployArtifacts(CarbonApplication carbonApp, AxisConfiguration axisC List artifacts = carbonApp.getAppConfig().getApplicationArtifact() .getDependencies(); - deployClassMediators(artifacts, axisConfig); - deploySynapseLibrary(artifacts, axisConfig); + deployClassMediators(artifacts, axisConfig, carbonApp); + deploySynapseLibrary(artifacts, axisConfig, carbonApp); Map> artifactTypeMap = getOrderedArtifactsMap(artifacts); //deploy artifacts @@ -335,8 +335,8 @@ && handleMainFaultSeqUndeployment(artifact, axisConfig)) { * @param axisConfig AxisConfiguration of the current tenant * @throws DeploymentException if something goes wrong while deployment */ - private void deployClassMediators(List artifacts, - AxisConfiguration axisConfig) throws DeploymentException { + private void deployClassMediators(List artifacts, AxisConfiguration axisConfig, + CarbonApplication carbonApplication) throws DeploymentException { for (Artifact.Dependency dependency : artifacts) { Artifact artifact = dependency.getArtifact(); @@ -354,7 +354,9 @@ private void deployClassMediators(List artifacts, String artifactPath = artifact.getExtractedPath() + File.separator + fileName; try { - deployer.deploy(new DeploymentFileData(new File(artifactPath), deployer)); + DeploymentFileData deploymentFileData = new DeploymentFileData(new File(artifactPath), deployer); + deploymentFileData.setClassLoader(carbonApplication.getClassLoader()); + deployer.deploy(deploymentFileData); artifact.setDeploymentStatus(AppDeployerConstants.DEPLOYMENT_STATUS_DEPLOYED); } catch (DeploymentException e) { artifact.setDeploymentStatus(AppDeployerConstants.DEPLOYMENT_STATUS_FAILED); @@ -372,8 +374,8 @@ private void deployClassMediators(List artifacts, * @param axisConfig AxisConfiguration of the current tenant * @throws DeploymentException if something goes wrong while deployment */ - private void deploySynapseLibrary(List artifacts, - AxisConfiguration axisConfig) throws DeploymentException { + private void deploySynapseLibrary(List artifacts, AxisConfiguration axisConfig, + CarbonApplication carbonApplication) throws DeploymentException { for (Artifact.Dependency dependency : artifacts) { Artifact artifact = dependency.getArtifact(); @@ -397,7 +399,9 @@ private void deploySynapseLibrary(List artifacts, artifact.setDeploymentStatus(AppDeployerConstants.DEPLOYMENT_STATUS_DEPLOYED); } else { try { - deployer.deploy(new DeploymentFileData(new File(artifactPath), deployer)); + DeploymentFileData deploymentFileData = new DeploymentFileData(new File(artifactPath), deployer); + deploymentFileData.setClassLoader(carbonApplication.getClassLoader()); + deployer.deploy(deploymentFileData); artifact.setDeploymentStatus(AppDeployerConstants.DEPLOYMENT_STATUS_DEPLOYED); try { String artifactName = getArtifactName(artifactPath, axisConfig); @@ -1122,7 +1126,9 @@ public void deployArtifactType(List artifacts, CarbonApplic } else { try { setCustomLogContent(deployer, carbonApp); - deployer.deploy(new DeploymentFileData(new File(artifactPath), deployer)); + DeploymentFileData deploymentFileData = new DeploymentFileData(new File(artifactPath), deployer); + deploymentFileData.setClassLoader(carbonApp.getClassLoader()); + deployer.deploy(deploymentFileData); artifact.setDeploymentStatus(AppDeployerConstants.DEPLOYMENT_STATUS_DEPLOYED); } catch (DeploymentException e) { artifact.setDeploymentStatus(AppDeployerConstants.DEPLOYMENT_STATUS_FAILED); diff --git a/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/payload/factory/PayloadFactorySynapseExpressionTestCase.java b/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/payload/factory/PayloadFactorySynapseExpressionTestCase.java index f8f1b5bf9b..d4fc355fee 100644 --- a/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/payload/factory/PayloadFactorySynapseExpressionTestCase.java +++ b/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/payload/factory/PayloadFactorySynapseExpressionTestCase.java @@ -146,8 +146,8 @@ public void testPayloadFactoryXMLToJson() throws IOException { String expectedResponse = "{\n" + " \"string\": \"John<\",\n" + - " \"escapedObject\": \"catemily\\\\ntini\",\n" + - " \"escapedObject2\": \"\\n dog<\\n John\\\\nWayne\\n \",\n" + + " \"escapedObject\": \"catemily\\\\\\\\ntini\",\n" + + " \"escapedObject2\": \"\\n dog<\\n John\\\\\\\\nWayne\\n \",\n" + " \"object\": {\n" + " \"pet\": {\n" + " \"type\": \"cat\",\n" + diff --git a/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/v2/LogMediatorTestCase.java b/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/v2/LogMediatorTestCase.java new file mode 100644 index 0000000000..ad487ad676 --- /dev/null +++ b/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/v2/LogMediatorTestCase.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.carbon.esb.mediator.test.v2; + +import org.apache.http.HttpResponse; +import org.apache.http.util.EntityUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.wso2.esb.integration.common.utils.CarbonLogReader; +import org.wso2.esb.integration.common.utils.ESBIntegrationTest; +import org.wso2.esb.integration.common.utils.clients.SimpleHttpClient; + +import java.io.IOException; + +public class LogMediatorTestCase extends ESBIntegrationTest { + + SimpleHttpClient httpClient = new SimpleHttpClient(); + + @BeforeClass(alwaysRun = true) + public void init() throws Exception { + + super.init(); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Log mediator INFO level") + public void testLogMediatorINFO() throws IOException, InterruptedException { + + CarbonLogReader carbonLogReader = new CarbonLogReader(); + carbonLogReader.start(); + + String requestPayload = "{\n" + + " \"data\": {\n" + + " \"name\": \"John\",\n" + + " \"food\": {\n" + + " \"cal\": \"55\",\n" + + " \"sugar\": \"none\"\n" + + " }\n" + + " }\n" + + "}"; + + String serviceURL = getMainSequenceURL() + "log-mediator-template/info"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/json"); + Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200, "Response code mismatched"); + EntityUtils.consumeQuietly(httpResponse.getEntity()); + + boolean logLine1 = carbonLogReader + .checkForLog("Processing info message: {\"name\":\"John\",\"food\":{\"cal\":\"55\",\"sugar\":\"none\"}} " + + "using endpoint http://localhost:8480/log-mediator-template/mock-backend-json", DEFAULT_TIMEOUT); + Assert.assertTrue(logLine1, "Log mediator INFO message Line 1 not logged"); + + boolean logLine2 = carbonLogReader + .checkForLog("Backend result = {\"pet\":{\"name\":\"pet3\",\"type\":\"mock-backend\"}}|" + + "requestID = John_123123|test = abc123", DEFAULT_TIMEOUT); + Assert.assertTrue(logLine2, "Log mediator INFO message Line 2 not logged"); + + carbonLogReader.stop(); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Log mediator WARN level") + public void testLogMediatorWARN() throws IOException, InterruptedException { + + CarbonLogReader carbonLogReader = new CarbonLogReader(); + carbonLogReader.start(); + + String requestPayload = "{\n" + + " \"data\": {\n" + + " \"name\": \"John\",\n" + + " \"food\": {\n" + + " \"cal\": \"55\",\n" + + " \"sugar\": \"none\"\n" + + " }\n" + + " }\n" + + "}"; + + String serviceURL = getMainSequenceURL() + "log-mediator-template/warn"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/json"); + Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200, "Response code mismatched"); + EntityUtils.consumeQuietly(httpResponse.getEntity()); + + boolean logLine1 = carbonLogReader + .checkForLog("Processing message: {\"name\":\"John\",\"food\":{\"cal\":\"55\",\"sugar\":\"none\"}} " + + "using endpoint", DEFAULT_TIMEOUT); + Assert.assertTrue(logLine1, "Log mediator WARN message Line 1 not logged"); + + boolean logLine2 = carbonLogReader + .checkForLog("Log warn message, payload = {\"pet\":{\"name\":\"pet3\",\"type\":\"mock-backend\"}}, " + + "content-type-header = application/json; charset=UTF-8", DEFAULT_TIMEOUT); + Assert.assertTrue(logLine2, "Log mediator WARN message Line 2 not logged"); + + carbonLogReader.stop(); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Log mediator ERROR level") + public void testLogMediatorERROR() throws IOException, InterruptedException { + + CarbonLogReader carbonLogReader = new CarbonLogReader(); + carbonLogReader.start(); + + String requestPayload = "{\n" + + " \"data\": {\n" + + " \"name\": \"John\",\n" + + " \"food\": {\n" + + " \"cal\": \"55\",\n" + + " \"sugar\": \"none\"\n" + + " }\n" + + " }\n" + + "}"; + + String serviceURL = getMainSequenceURL() + "log-mediator-template/error"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/json"); + Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200, "Response code mismatched"); + EntityUtils.consumeQuietly(httpResponse.getEntity()); + + boolean logLine1 = carbonLogReader + .checkForLog("Processing error message: {\"name\":\"John\",\"food\":{\"cal\":\"55\",\"sugar\":\"none\"}}", + DEFAULT_TIMEOUT); + Assert.assertTrue(logLine1, "Log mediator ERROR message Line 1 not logged"); + + boolean logLine2 = carbonLogReader + .checkForLog("Error occurred while processing backend response, STATUS_CODE = 200", DEFAULT_TIMEOUT); + Assert.assertTrue(logLine2, "Log mediator ERROR message Line 2 not logged"); + + carbonLogReader.stop(); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Log mediator FATAL level") + public void testLogMediatorFATAL() throws IOException, InterruptedException { + + CarbonLogReader carbonLogReader = new CarbonLogReader(); + carbonLogReader.start(); + + String requestPayload = "John55none"; + + String serviceURL = getMainSequenceURL() + "log-mediator-template/fatal"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/xml"); + Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200, "Response code mismatched"); + EntityUtils.consumeQuietly(httpResponse.getEntity()); + + boolean logLine1 = carbonLogReader + .checkForLog("Critical issue detected: 55none, prop1 = synapse_prop1", + DEFAULT_TIMEOUT); + Assert.assertTrue(logLine1, "Log mediator FATAL message Line 1 not logged"); + carbonLogReader.stop(); + } + + @AfterClass(alwaysRun = true) + private void destroy() throws Exception { + + super.cleanup(); + } +} diff --git a/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/v2/ScatterGatherTestCase.java b/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/v2/ScatterGatherTestCase.java new file mode 100644 index 0000000000..b4e92a7a67 --- /dev/null +++ b/integration/mediation-tests/tests-mediator-1/src/test/java/org/wso2/carbon/esb/mediator/test/v2/ScatterGatherTestCase.java @@ -0,0 +1,267 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.carbon.esb.mediator.test.v2; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.http.HttpResponse; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.w3c.dom.Document; +import org.wso2.esb.integration.common.utils.CarbonLogReader; +import org.wso2.esb.integration.common.utils.ESBIntegrationTest; +import org.wso2.esb.integration.common.utils.clients.SimpleHttpClient; +import org.xml.sax.SAXException; + +import java.io.IOException; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class ScatterGatherTestCase extends ESBIntegrationTest { + + SimpleHttpClient httpClient = new SimpleHttpClient(); + + @BeforeClass(alwaysRun = true) + public void init() throws Exception { + + super.init(); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Scatter-Gather mediator with JSON body replace") + public void testScatterGatherJSONBodyReplace() throws IOException { + + String expectedResponse = "[\n" + + " {\n" + + " \"name\":\"pet1\",\n" + + " \"type\":\"dog\",\n" + + " \"requestId\":1114567\n" + + " },\n" + + " {\n" + + " \"name\":\"pet2\",\n" + + " \"type\":\"cat\",\n" + + " \"requestId\":1114567\n" + + " },\n" + + " {\n" + + " \"name\":\"pet3\",\n" + + " \"type\":\"mock-backend\",\n" + + " \"requestId\":1114567\n" + + " }\n" + + "]"; + + String requestPayload = "{\n" + + " \"requestId\": 1114567\n" + + "}"; + + String serviceURL = getMainSequenceURL() + "scatter-gather/json-body-replace"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/json"); + String responsePayload = httpClient.getResponsePayload(httpResponse); + + JsonElement responseJSON = JsonParser.parseString(responsePayload); + JsonElement expectedJSON = JsonParser.parseString(expectedResponse); + assertTrue(areJsonElementsEquivalent(expectedJSON, responseJSON), "Response payload mismatched"); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Scatter-Gather mediator with JSON and variable output") + public void testScatterGatherJSONVariableOutput() throws IOException, InterruptedException { + + CarbonLogReader carbonLogReader = new CarbonLogReader(); + carbonLogReader.start(); + + String requestPayload = "{\n" + + " \"requestId\": 1114567\n" + + "}"; + + String serviceURL = getMainSequenceURL() + "scatter-gather/json-variable-output"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/json"); + String responsePayload = httpClient.getResponsePayload(httpResponse); + + JsonElement responseJSON = JsonParser.parseString(responsePayload); + JsonElement expectedJSON = JsonParser.parseString(requestPayload); + assertEquals(responseJSON, expectedJSON, "Response payload mismatched"); + + boolean logFound = carbonLogReader + .checkForLog("Scatter Gather output = [{\"name\":\"pet1\",\"type\":\"dog\",\"requestId\":1114567}," + + "{\"name\":\"pet2\",\"type\":\"cat\",\"requestId\":1114567},{\"name\":\"pet3\",\"type\":\"mock-backend\"," + + "\"requestId\":1114567}]", DEFAULT_TIMEOUT); + Assert.assertTrue(logFound, "Scatter Gather result not set to variable"); + carbonLogReader.stop(); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Scatter-Gather mediator with XML body replace") + public void testScatterGatherXMLBodyReplace() throws IOException, ParserConfigurationException, SAXException { + + String expectedResponse = "pet1cat78658" + + "pet2mock-backend78658"; + + String requestPayload = "\n" + + " 78658\n" + + ""; + + String serviceURL = getMainSequenceURL() + "scatter-gather/xml-body-replace"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/xml"); + String responsePayload = httpClient.getResponsePayload(httpResponse); + + Document document1 = parseXML(expectedResponse); + Document document2 = parseXML(responsePayload); + + if (!document1.isEqualNode(document2)) { + Assert.fail("Response payload mismatched: " + responsePayload + " expected: " + expectedResponse); + } + } + + @Test(groups = {"wso2.esb"}, description = "Testing Scatter-Gather mediator with XML and variable output") + public void testScatterGatherXMLVariableOutput() throws IOException, InterruptedException, ParserConfigurationException, SAXException { + + CarbonLogReader carbonLogReader = new CarbonLogReader(); + carbonLogReader.start(); + + String requestPayload = "\n" + + " 78658\n" + + ""; + + String serviceURL = getMainSequenceURL() + "scatter-gather/xml-variable-output"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/xml"); + String responsePayload = httpClient.getResponsePayload(httpResponse); + + Document document1 = parseXML(requestPayload); + Document document2 = parseXML(responsePayload); + + if (!document1.isEqualNode(document2)) { + Assert.fail("Response payload mismatched: " + responsePayload + " expected: " + requestPayload); + } + + boolean logFound = carbonLogReader + .checkForLog("Scatter Gather XML output = pet1cat" + + "78658pet2dog78658" + + "", DEFAULT_TIMEOUT); + Assert.assertTrue(logFound, "Scatter Gather result not set to variable"); + carbonLogReader.stop(); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Scatter-Gather mediator with Aggregation condition") + public void testScatterGatherJSONCondition() throws IOException { + + String expectedResponse = "[\n" + + " {\n" + + " \"name\":\"pet1\",\n" + + " \"type\":\"dog\",\n" + + " \"requestId\":1114567\n" + + " },\n" + + " {\n" + + " \"name\":\"pet3\",\n" + + " \"type\":\"dog\",\n" + + " \"requestId\":1114567\n" + + " }\n" + + "]"; + + String requestPayload = "{\n" + + " \"requestId\": 1114567\n" + + "}"; + + String serviceURL = getMainSequenceURL() + "scatter-gather/aggregate-condition"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/json"); + String responsePayload = httpClient.getResponsePayload(httpResponse); + + JsonElement responseJSON = JsonParser.parseString(responsePayload); + JsonElement expectedJSON = JsonParser.parseString(expectedResponse); + assertTrue(areJsonElementsEquivalent(expectedJSON, responseJSON), "Response payload mismatched"); + } + + @Test(groups = {"wso2.esb"}, description = "Testing Scatter-Gather mediator when a path fails") + public void testScatterGatherJSON404EPClone() throws IOException { + + String expectedResponse = "[\n" + + " {\n" + + " \"name\":\"pet1\",\n" + + " \"type\":\"dog\",\n" + + " \"requestId\":1114567\n" + + " },\n" + + " {\n" + + " \"name\":\"pet3\",\n" + + " \"type\":\"dog\",\n" + + " \"requestId\":1114567\n" + + " }\n" + + "]"; + + String requestPayload = "{\n" + + " \"requestId\": 1114567\n" + + "}"; + + String serviceURL = getMainSequenceURL() + "scatter-gather/not-found-ep"; + HttpResponse httpResponse = httpClient.doPost(serviceURL, null, requestPayload, "application/json"); + String responsePayload = httpClient.getResponsePayload(httpResponse); + + JsonElement responseJSON = JsonParser.parseString(responsePayload); + JsonElement expectedJSON = JsonParser.parseString(expectedResponse); + assertTrue(areJsonElementsEquivalent(expectedJSON, responseJSON), "Response payload mismatched"); + } + + private static boolean areJsonElementsEquivalent(JsonElement e1, JsonElement e2) { + + if (e1.isJsonObject() && e2.isJsonObject()) { + JsonObject obj1 = e1.getAsJsonObject(); + JsonObject obj2 = e2.getAsJsonObject(); + + if (obj1.size() != obj2.size()) { + return false; + } + + for (String key : obj1.keySet()) { + if (!obj2.has(key) || !areJsonElementsEquivalent(obj1.get(key), obj2.get(key))) { + return false; + } + } + return true; + } else if (e1.isJsonArray() && e2.isJsonArray()) { + if (e1.getAsJsonArray().size() != e2.getAsJsonArray().size()) { + return false; + } + + for (int i = 0; i < e1.getAsJsonArray().size(); i++) { + if (!areJsonElementsEquivalent(e1.getAsJsonArray().get(i), e2.getAsJsonArray().get(i))) { + return false; + } + } + return true; + } else { + return e1.equals(e2); + } + } + + private static Document parseXML(String xml) throws IOException, ParserConfigurationException, SAXException { + + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setIgnoringElementContentWhitespace(true); + DocumentBuilder builder = factory.newDocumentBuilder(); + return builder.parse(new java.io.ByteArrayInputStream(xml.getBytes())); + } + + @AfterClass(alwaysRun = true) + private void destroy() throws Exception { + + super.cleanup(); + } +} diff --git a/integration/mediation-tests/tests-mediator-1/src/test/resources/artifacts/ESB/server/repository/deployment/server/synapse-configs/default/api/TestLogMediatorTemplating.xml b/integration/mediation-tests/tests-mediator-1/src/test/resources/artifacts/ESB/server/repository/deployment/server/synapse-configs/default/api/TestLogMediatorTemplating.xml new file mode 100644 index 0000000000..25c42b6592 --- /dev/null +++ b/integration/mediation-tests/tests-mediator-1/src/test/resources/artifacts/ESB/server/repository/deployment/server/synapse-configs/default/api/TestLogMediatorTemplating.xml @@ -0,0 +1,104 @@ + + + + + + + + + Processing info message: ${payload.data} using endpoint ${var.endpoint} + + + + + + + + Backend result = ${payload} + + + + + + + + + + Processing message: ${payload.data} using endpoint ${var.undefined} + + + + + + + + Log warn message + + + + + + + + + + Processing error message: ${payload.data} + + + + + + + + Error occurred while processing backend response + + + + + + + + + + + + Critical issue detected: ${xpath('//data/food')} + + + + + + + + + { + "pet": { + "name": "pet3", + "type": "mock-backend" + } + } + + + + + + + + diff --git a/integration/mediation-tests/tests-mediator-1/src/test/resources/artifacts/ESB/server/repository/deployment/server/synapse-configs/default/api/TestScatterGatherMediator.xml b/integration/mediation-tests/tests-mediator-1/src/test/resources/artifacts/ESB/server/repository/deployment/server/synapse-configs/default/api/TestScatterGatherMediator.xml new file mode 100644 index 0000000000..255ac63ba4 --- /dev/null +++ b/integration/mediation-tests/tests-mediator-1/src/test/resources/artifacts/ESB/server/repository/deployment/server/synapse-configs/default/api/TestScatterGatherMediator.xml @@ -0,0 +1,288 @@ + + + + + + + + + + { + "pet": { + "name": "pet1", + "type": "dog", + "requestId": ${payload.requestId} + } + } + + + + + + + + pet2 + cat + ${payload.requestId} + + + + + + + + + + + + + + + + + + + + + + { + "pet": { + "name": "pet1", + "type": "dog", + "requestId": ${payload.requestId} + } + } + + + + + + + + pet2 + cat + ${payload.requestId} + + + + + + + + + + + + + + Scatter Gather output = ${var.output_var} + + + + + + + + + + + + + + pet1 + cat + ${xpath('//requestId')} + + + + + + + + + + + + + + + + + + + + + + + + + + + + pet1 + cat + ${xpath('//requestId')} + + + + + + + + + + + pet2 + dog + ${xpath('//requestId')} + + + + + + + + + Scatter Gather XML output = ${var.output_var} + + + + + + + + + + + + + { + "pet": { + "name": "pet1", + "type": "dog", + "requestId": ${payload.requestId} + } + } + + + + + + { + "pet": { + "name": "pet2", + "type": "cat", + "requestId": ${payload.requestId} + } + } + + + + + + { + "pet": { + "name": "pet3", + "type": "dog", + "requestId": ${payload.requestId} + } + } + + + + + + + + + + + + + + { + "pet": { + "name": "pet1", + "type": "dog", + "requestId": ${payload.requestId} + } + } + + + + + + + + + + + + + { + "pet": { + "name": "pet3", + "type": "dog", + "requestId": ${payload.requestId} + } + } + + + + + + + + + + + { + "pet": { + "name": "pet3", + "type": "mock-backend", + "requestId": ${payload.requestId} + } + } + + + + + + + + + + + + + pet2 + mock-backend + ${xpath('//requestId')} + + + + + + + + + diff --git a/integration/mediation-tests/tests-mediator-1/src/test/resources/testng.xml b/integration/mediation-tests/tests-mediator-1/src/test/resources/testng.xml index a4c33b0e21..99b8bdbcba 100644 --- a/integration/mediation-tests/tests-mediator-1/src/test/resources/testng.xml +++ b/integration/mediation-tests/tests-mediator-1/src/test/resources/testng.xml @@ -325,6 +325,11 @@ + + + + + diff --git a/integration/pom.xml b/integration/pom.xml index 19f5f15d34..2b5093727a 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -813,7 +813,7 @@ 2.0.53.Final 1.2.0 5.9.1 - 2.1.0.wso2v1 + 2.4.0.wso2v2 2.2 1.1.1 3.5.9 diff --git a/pom.xml b/pom.xml index 78c94beb54..b9afda229b 100644 --- a/pom.xml +++ b/pom.xml @@ -1722,7 +1722,7 @@ 4.2.0 2.5.0 33.0.0-jre - 2.1.0.wso2v1 + 2.4.0.wso2v2 4.8.2 4.1.108.Final 2.0.53.Final