From f2cfc2e22a81d5c393e7b9e87c021a921437c3bd Mon Sep 17 00:00:00 2001 From: malakaganga Date: Wed, 11 Sep 2024 20:29:12 +0530 Subject: [PATCH 1/2] Fix task duplication in a volatile environment This fix ensures robust handling of database delays without introducing task duplication (compromising availability over consistency) By introducing a timeout for each db event and disabling tasks on timeouts also stopping adding or removing members in a db connection turbulence is present. Fixes: https://github.com/wso2/micro-integrator/issues/3463 --- .../coordination/task/TaskEventListener.java | 34 ++- .../scehduler/CoordinatedTaskScheduler.java | 31 ++- .../coordination/ClusterEventListener.java | 2 +- .../coordination/MemberEventListener.java | 3 +- .../RDBMSCommunicationBusContextImpl.java | 170 ++++++++++-- .../RDBMSCoordinationStrategy.java | 262 ++++++++++++++++-- .../RDBMSMemberEventCallBack.java | 22 ++ .../RDBMSMemberEventListenerTask.java | 68 +++-- .../RDBMSMemberEventProcessor.java | 39 ++- 9 files changed, 541 insertions(+), 90 deletions(-) create mode 100644 components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventCallBack.java diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java index 2d2a173ef1..a43730e470 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.wso2.micro.integrator.coordination.ClusterCoordinator; import org.wso2.micro.integrator.coordination.MemberEventListener; +import org.wso2.micro.integrator.coordination.RDBMSMemberEventCallBack; import org.wso2.micro.integrator.coordination.node.NodeDetail; import org.wso2.micro.integrator.ntask.common.TaskException; import org.wso2.micro.integrator.ntask.coordination.TaskCoordinationException; @@ -79,13 +80,7 @@ && isMemberRejoinedAfterUnresponsiveness()) { // This node became unresponsive and rejoined the cluster hence removing all tasks assigned to this node // then start the scheduler again after cleaning the locally running tasks. becameUnresponsive(nodeDetail.getNodeId()); - try { - //Remove from database - taskStore.deleteTasks(nodeDetail.getNodeId()); - } catch (TaskCoordinationException e) { - LOG.error("Error while removing the tasks of this node.", e); - } - reJoined(nodeDetail.getNodeId()); + reJoined(nodeDetail.getNodeId(), null); } } @@ -117,7 +112,7 @@ public void becameUnresponsive(String nodeId) { ScheduledExecutorService taskScheduler = dataHolder.getTaskScheduler(); if (taskScheduler != null) { LOG.info("Shutting down coordinated task scheduler scheduler since the node became unresponsive."); - taskScheduler.shutdown(); + taskScheduler.shutdownNow(); dataHolder.setTaskScheduler(null); } List tasks = taskManager.getLocallyRunningCoordinatedTasks(); @@ -129,6 +124,13 @@ public void becameUnresponsive(String nodeId) { LOG.error("Unable to pause the task " + task, e); } }); + + try { + // Unassigns tasks from the specified node and updates their state in the task store. + taskStore.unAssignAndUpdateState(nodeId); + } catch (TaskCoordinationException e) { + LOG.error("Error while removing the tasks of this node.", e); + } } /** @@ -142,7 +144,7 @@ public boolean isMemberRejoinedAfterUnresponsiveness() { @Override - public void reJoined(String nodeId) { + public void reJoined(String nodeId, RDBMSMemberEventCallBack callBack) { LOG.debug("This node re-joined the cluster successfully."); try { @@ -150,14 +152,16 @@ public void reJoined(String nodeId) { // hasn't happened already or the task hasn't been captured by task cleaning event. // this will ensure that the task duplication doesn't occur. taskStore.unAssignAndUpdateState(nodeId); + // start the scheduler again since the node joined cluster successfully. + CoordinatedTaskScheduleManager scheduleManager = new CoordinatedTaskScheduleManager(taskManager, taskStore, + clusterCoordinator, locationResolver); + scheduleManager.startTaskScheduler(" upon rejoining the cluster"); } catch (Throwable e) { // catching throwable so that we don't miss starting the scheduler - LOG.error("Error occurred while cleaning the tasks of node " + nodeId, e); + LOG.error("Error occurred while cleaning the tasks while rejoining of node " + nodeId, e); + if (callBack != null) { + callBack.onExceptionThrown(nodeId, e); + } } - // start the scheduler again since the node joined cluster successfully. - CoordinatedTaskScheduleManager scheduleManager = new CoordinatedTaskScheduleManager(taskManager, taskStore, - clusterCoordinator, - locationResolver); - scheduleManager.startTaskScheduler(" upon rejoining the cluster"); } } diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java index 521460a3cf..4eac3cadd8 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java @@ -85,6 +85,7 @@ public void run() { try { pauseDeactivatedTasks(); scheduleAssignedTasks(CoordinatedTask.States.ACTIVATED); + checkInterrupted(); if (clusterCoordinator.isLeader()) { // cleaning will run for each n times resolving frequency . ( n = 0,1,2 ... ). if (resolveCount % resolvingFrequency == 0) { @@ -101,11 +102,37 @@ public void run() { } // schedule all tasks assigned to this node and in state none scheduleAssignedTasks(CoordinatedTask.States.NONE); + checkInterrupted(); } catch (Throwable throwable) { // catching throwable to prohibit permanent stopping of the executor service. LOG.fatal("Unexpected error occurred while trying to schedule tasks.", throwable); } } + /** + * Check if the task is interrupted. + * + * @throws TaskCoordinationException when something goes wrong connecting to the store + */ + private void checkInterrupted() throws InterruptedException { + + if (Thread.currentThread().isInterrupted()) { + try { + List tasks = taskManager.getLocallyRunningCoordinatedTasks(); + // stop all running coordinated tasks. + tasks.forEach(task -> { + try { + taskManager.stopExecution(task); + } catch (TaskException e) { + LOG.error("Unable to pause the task " + task, e); + } + }); + } finally { + Thread.currentThread().interrupt(); + throw new InterruptedException("Task was interrupted."); + } + } + } + /** * Pause ( stop execution ) the deactivated tasks. * @@ -166,7 +193,8 @@ private void addFailedTasks() throws TaskCoordinationException { * @param state - The state of the tasks which need to be scheduled. * @throws TaskCoordinationException - When something goes wrong while retrieving all the assigned tasks. */ - private void scheduleAssignedTasks(CoordinatedTask.States state) throws TaskCoordinationException { + private void scheduleAssignedTasks(CoordinatedTask.States state) throws TaskCoordinationException + , InterruptedException { LOG.debug("Retrieving tasks assigned to this node and to be scheduled."); List tasksOfThisNode = taskStore.retrieveTaskNames(localNodeId, state); @@ -184,6 +212,7 @@ private void scheduleAssignedTasks(CoordinatedTask.States state) throws TaskCoor LOG.debug("Submitting retrieved task [" + taskName + "] to the task manager."); } try { + checkInterrupted(); taskManager.scheduleCoordinatedTask(taskName); } catch (TaskException ex) { if (!TaskException.Code.DATABASE_ERROR.equals(ex.getCode())) { diff --git a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/ClusterEventListener.java b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/ClusterEventListener.java index 614db6475a..08b7150b10 100644 --- a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/ClusterEventListener.java +++ b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/ClusterEventListener.java @@ -62,7 +62,7 @@ public void becameUnresponsive(String nodeId) { } @Override - public void reJoined(String nodeId) { + public void reJoined(String nodeId, RDBMSMemberEventCallBack callBack) { LOG.info("Re-joined the cluster successfully."); } } diff --git a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/MemberEventListener.java b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/MemberEventListener.java index 37562cb31e..6ebbbfa439 100644 --- a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/MemberEventListener.java +++ b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/MemberEventListener.java @@ -57,8 +57,9 @@ public abstract class MemberEventListener { * Invoked when the node is back to active state after being inactive. * * @param nodeId - The Id of this node + * @param callBack - The callback to be called when an exception is thrown */ - public abstract void reJoined(String nodeId); + public abstract void reJoined(String nodeId, RDBMSMemberEventCallBack callBack); public String getGroupId() { return this.groupId; diff --git a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCommunicationBusContextImpl.java b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCommunicationBusContextImpl.java index b21cd35a49..d4744a0de6 100644 --- a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCommunicationBusContextImpl.java +++ b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCommunicationBusContextImpl.java @@ -80,9 +80,11 @@ public RDBMSCommunicationBusContextImpl() { public void storeMembershipEvent(String changedMember, String groupId, List clusterNodes, int membershipEventType) throws ClusterCoordinationException { Connection connection = null; + boolean isRolledBack = false; // Flag to track if rollback has occurred + boolean isTransactionSuccessful = false; PreparedStatement storeMembershipEventPreparedStatement = null; String task = "Storing membership event: " + membershipEventType + " for member: " + changedMember - + " in group " + groupId; + + " in group " + groupId; try { connection = getConnection(); storeMembershipEventPreparedStatement = connection @@ -95,15 +97,27 @@ public void storeMembershipEvent(String changedMember, String groupId, List clusterNodes) throws ClusterCoordinationException { Connection connection = null; + boolean isRolledBack = false; // Flag to track if rollback has occurred + boolean isTransactionSuccessful = false; PreparedStatement storeRemovedMembersPreparedStatement = null; String task = "Storing removed member: " + removedMember + " in group " + groupId; try { @@ -563,15 +660,26 @@ public void insertRemovedNodeDetails(String removedMember, String groupId, List< storeRemovedMembersPreparedStatement.addBatch(); } storeRemovedMembersPreparedStatement.executeBatch(); + // Before committing, check if the thread was interrupted + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Thread was interrupted, avoiding commit."); + } connection.commit(); + isTransactionSuccessful = true; if (log.isDebugEnabled()) { log.debug(StringUtil.removeCRLFCharacters(task) + " executed successfully"); } - } catch (SQLException e) { + } catch (SQLException | InterruptedException e) { rollback(connection, task); + isRolledBack = true; // Mark as rolled back + log.warn("Transaction rolled back for task: " + task); throw new ClusterCoordinationException( "Error storing removed member: " + removedMember + " in group " + groupId, e); } finally { + if (!isTransactionSuccessful && !isRolledBack && Thread.currentThread().isInterrupted()) { + rollback(connection, task); + log.warn("Transaction rolled back for task: " + task); + } close(storeRemovedMembersPreparedStatement, task); close(connection, task); } @@ -580,6 +688,8 @@ public void insertRemovedNodeDetails(String removedMember, String groupId, List< @Override public void markNodeAsNotNew(String nodeId, String groupId) throws ClusterCoordinationException { Connection connection = null; + boolean isRolledBack = false; // Flag to track if rollback has occurred + boolean isTransactionSuccessful = false; PreparedStatement preparedStatement = null; try { connection = getConnection(); @@ -590,15 +700,27 @@ public void markNodeAsNotNew(String nodeId, String groupId) throws ClusterCoordi if (updateCount == 0) { log.warn("No record was updated while marking node as not new"); } + // Before committing, check if the thread was interrupted + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Thread was interrupted, avoiding commit."); + } connection.commit(); + isTransactionSuccessful = true; if (log.isDebugEnabled()) { log.debug(RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW + " of node " - + StringUtil.removeCRLFCharacters(nodeId) + " executed successfully"); + + StringUtil.removeCRLFCharacters(nodeId) + " executed successfully"); } - } catch (SQLException e) { + } catch (SQLException | InterruptedException e) { rollback(connection, RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW); - throw new ClusterCoordinationException("Error occurred while " + RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW, e); + isRolledBack = true; // Mark as rolled back + log.warn("Transaction rolled back for task: " + RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW); + throw new ClusterCoordinationException("Error occurred while " + + RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW, e); } finally { + if (!isTransactionSuccessful && !isRolledBack && Thread.currentThread().isInterrupted()) { + rollback(connection, RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW); + log.warn("Transaction rolled back for task: " + RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW); + } close(preparedStatement, RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW); close(connection, RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW); } @@ -725,7 +847,9 @@ private void close(Statement preparedStatement, String task) { private void rollback(Connection connection, String task) { if (connection != null) { try { - connection.rollback(); + if (!connection.isClosed()) { + connection.rollback(); + } } catch (SQLException e) { log.warn("Rollback failed on " + StringUtil.removeCRLFCharacters(task), e); } diff --git a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCoordinationStrategy.java b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCoordinationStrategy.java index 2193b6a56c..7c09ab1681 100644 --- a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCoordinationStrategy.java +++ b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCoordinationStrategy.java @@ -33,9 +33,14 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.sql.DataSource; import static org.wso2.micro.integrator.coordination.util.RDBMSConstantUtils.CLUSTER_CONFIG; @@ -64,11 +69,22 @@ public class RDBMSCoordinationStrategy implements CoordinationStrategy { * This is used to give the user a warning when the heartbeat interval exceeds the limit. */ private double heartbeatWarningMargin; + + /** + * Maximum time taken to read from the database. + */ + private long maxDBReadTime; + /** * Heartbeat retry interval. */ private int heartbeatMaxRetry; + /** + * Interval after which a node is considered inactive after being unresponsive. + */ + private long inactiveIntervalAfterUnresponsive; + /** * Thread executor used to run the coordination algorithm. */ @@ -115,6 +131,8 @@ private RDBMSCoordinationStrategy(RDBMSCommunicationBusContextImpl communication } this.heartbeatMaxRetryInterval = heartBeatInterval * heartbeatMaxRetry; this.heartbeatWarningMargin = heartbeatMaxRetryInterval * 0.75; + this.maxDBReadTime = (long) (heartbeatWarningMargin / 30); + this.inactiveIntervalAfterUnresponsive = heartbeatMaxRetryInterval * 2L; ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setPriority(7) .setNameFormat("RDBMSCoordinationStrategy-%d").build(); @@ -123,7 +141,7 @@ private RDBMSCoordinationStrategy(RDBMSCommunicationBusContextImpl communication this.localNodeId = getNodeId(); this.communicationBusContext = communicationBusContext; this.rdbmsMemberEventProcessor = new RDBMSMemberEventProcessor(localNodeId, localGroupId, - (int) heartbeatWarningMargin, communicationBusContext); + (int) heartbeatWarningMargin, communicationBusContext, maxDBReadTime); } /** @@ -266,6 +284,16 @@ public void registerEventListener(MemberEventListener memberEventListener) { rdbmsMemberEventProcessor.addEventListener(memberEventListener); } + /** + * Marks the specified node as unresponsive. + * + * @param nodeId the ID of the node to be marked as unresponsive + * @param groupId the ID of the group to which the node belongs + */ + public void setUnresponsiveness(String nodeId, String groupId) { + rdbmsMemberEventProcessor.setMemberUnresponsiveIfNeeded(nodeId, groupId, true); + } + private String getNodeId() { String nodeId = System.getProperty(NODE_ID); @@ -363,6 +391,11 @@ private class CoordinatorElectionTask { */ private String localGroupId; + /** + * Executor service used to communicate with the database. + */ + private ExecutorService dbCommunicatorExecutor = Executors.newSingleThreadExecutor(); + /** * Constructor. * @@ -399,6 +432,12 @@ public void runCoordinationElectionTask(long currentHeartbeatTime) { performCoordinatorTask(currentHeartbeatTime, timeTakenForCoordinatorTasks); break; } + if (rdbmsMemberEventProcessor.isMemberUnresponsive()) { + log.info("Initiating unresponsive member recovery process for node " + localNodeId); + rdbmsMemberEventProcessor.setMemberUnresponsiveIfNeeded(localNodeId, localGroupId + , false); + rdbmsMemberEventProcessor.setMemberRejoined(localNodeId, localGroupId); + } long clusterTaskEndingTime = System.currentTimeMillis(); if (log.isDebugEnabled() && clusterTaskEndingTime - currentHeartbeatTime > 1000) { log.debug("Cluster task took " + @@ -430,9 +469,38 @@ public void runCoordinationElectionTask(long currentHeartbeatTime) { } // We are catching throwable to avoid subsequent executions getting suppressed } catch (Throwable e) { - log.error("Error detected while running coordination algorithm. Node became a " - + NodeState.MEMBER + " node in group " + localGroupId, e); currentNodeState = NodeState.MEMBER; + try { + // Sleep for the duration of the inactiveIntervalAfterUnresponsive to allow time for the database to + // recover and give other nodes to take over the coordinator role and/or remove the node from the + // group. + Thread.sleep(inactiveIntervalAfterUnresponsive); + } catch (InterruptedException ex) { + // ignore + } + } + } + + /** + * Perform DB operations with a timeout. + * + * @param task the callable task to be performed + * @param the return type of the task + * @return the result of the task + * @throws ClusterCoordinationException if the task execution takes more time than the max DB timeout interval + */ + public T performDBOperationsWithTimeout(Callable task) throws ClusterCoordinationException { + Future future = dbCommunicatorExecutor.submit(task); + try { + return future.get(maxDBReadTime, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + try { + future.cancel(true); // Cancel the task if it exceeds the max DB timeout interval + } catch (Exception ex) { + //ignore + } + throw new ClusterCoordinationException("Database connection turned unresponsive. " + + "Please increase the heartbeat interval or verify the database connection.", e); } } @@ -446,18 +514,43 @@ private void performMemberTask(long currentHeartbeatTime, long[] timeTakenForMem throws ClusterCoordinationException, InterruptedException { long taskStartTime = System.currentTimeMillis(); long taskEndTime; - updateNodeHeartBeat(currentHeartbeatTime); + try { + performDBOperationsWithTimeout(() -> { + updateNodeHeartBeat(currentHeartbeatTime); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e, "Error updating node heartbeat."); + throw e; + } taskEndTime = System.currentTimeMillis(); timeTakenForMemberTasks[0] = taskEndTime - taskStartTime; taskStartTime = taskEndTime; - boolean coordinatorValid = communicationBusContext.checkIfCoordinatorValid(localGroupId, localNodeId, - heartbeatMaxRetryInterval, currentHeartbeatTime); + boolean coordinatorValid; + try { + coordinatorValid = performDBOperationsWithTimeout(() -> + communicationBusContext.checkIfCoordinatorValid + (localGroupId, localNodeId, heartbeatMaxRetryInterval, currentHeartbeatTime) + ); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e, "Error checking if coordinator is valid."); + throw e; + } taskEndTime = System.currentTimeMillis(); timeTakenForMemberTasks[1] = taskEndTime - taskStartTime; if (!coordinatorValid) { taskStartTime = taskEndTime; - communicationBusContext.removeCoordinator(localGroupId, heartbeatMaxRetryInterval, - currentHeartbeatTime); + try { + performDBOperationsWithTimeout(() -> { + communicationBusContext.removeCoordinator(localGroupId, heartbeatMaxRetryInterval + , currentHeartbeatTime); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e, "Error removing coordinator for group " + + localGroupId); + throw e; + } taskEndTime = System.currentTimeMillis(); timeTakenForMemberTasks[2] = taskEndTime - taskStartTime; taskStartTime = taskEndTime; @@ -492,17 +585,49 @@ private void performCoordinatorTask(long currentHeartbeatTime, long[] timeTakenF // Try to update the coordinator heartbeat long taskStartTime = System.currentTimeMillis(); long taskEndTime; - boolean stillCoordinator = communicationBusContext. - updateCoordinatorHeartbeat(localNodeId, localGroupId, currentHeartbeatTime); + boolean stillCoordinator; + try { + stillCoordinator = performDBOperationsWithTimeout(() -> + communicationBusContext.updateCoordinatorHeartbeat + (localNodeId, localGroupId, currentHeartbeatTime)); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e, + "Error updating coordinator heartbeat in LEADER_STATUS_TABLE due to database delay." + + " Stopping coordinated tasks for this node. Please increase the heartbeat interval or" + + " verify the database connection."); + throw e; + } taskEndTime = System.currentTimeMillis(); timeTakenForCoordinatorTasks[0] = taskEndTime - taskStartTime; taskStartTime = taskEndTime; if (stillCoordinator) { - updateNodeHeartBeat(currentHeartbeatTime); + try { + performDBOperationsWithTimeout(() -> { + updateNodeHeartBeat(currentHeartbeatTime); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e, + "Error updating node heartbeat in CLUSTER_NODE_STATUS_TABLE due to database delay." + + " Stopping coordinated tasks for this node. Please increase the heartbeat interval" + + " or verify the database connection."); + throw e; + } taskEndTime = System.currentTimeMillis(); timeTakenForCoordinatorTasks[1] = taskEndTime - taskStartTime; taskStartTime = taskEndTime; - List allNodeInformation = communicationBusContext.getAllNodeData(localGroupId); + + List allNodeInformation; + try { + allNodeInformation = performDBOperationsWithTimeout(() -> + communicationBusContext.getAllNodeData(localGroupId)); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e, "Error retrieving all node data from" + + " LEADER_STATUS_TABLE and CLUSTER_NODE_STATUS_TABLE due to database delay." + + " Stopping coordinated tasks for this node. Please increase the heartbeat interval or" + + " verify the database connection."); + throw e; + } taskEndTime = System.currentTimeMillis(); timeTakenForCoordinatorTasks[2] = taskEndTime - taskStartTime; taskStartTime = taskEndTime; @@ -537,16 +662,49 @@ private void findAddedRemovedMembers(List allNodeInformation, long c removedNodes.add(nodeId); allActiveNodeIds.remove(nodeId); removedNodeDetails.add(nodeDetail); - communicationBusContext.removeNode(nodeId, localGroupId); + try { + performDBOperationsWithTimeout(() -> { + communicationBusContext.removeNode(nodeId, localGroupId); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(nodeId, localGroupId, e, "Error removing node " + nodeId + + " from group " + localGroupId + " due to database delay."); + throw e; + } } else if (nodeDetail.isNewNode()) { newNodes.add(nodeId); - communicationBusContext.markNodeAsNotNew(nodeId, localGroupId); + try { + performDBOperationsWithTimeout(() -> { + communicationBusContext.markNodeAsNotNew(nodeId, localGroupId); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(nodeId, localGroupId, e, "Error marking node as not" + + " new for nodeId: " + nodeId + " in group: " + localGroupId); + throw e; + + } } } + notifyAddedMembers(newNodes, allActiveNodeIds); notifyRemovedMembers(removedNodes, allActiveNodeIds, removedNodeDetails); } + /** + * Handles the database delay. + * + * @param nodeId node ID of the current node + * @param groupId group ID of the current group + * @param e exception occurred + * @param logMessage log message + */ + private void handleDatabaseDelay(String nodeId, String groupId, Exception e, String logMessage) { + log.warn(logMessage + " Make task Sleep for the duration of : " + inactiveIntervalAfterUnresponsive , e); + setUnresponsiveness(nodeId, groupId); + } + /** * Notifies the members in the group about the newly added nodes. * @@ -559,8 +717,17 @@ private void notifyAddedMembers(List newNodes, List allActiveNod log.debug("Member added " + StringUtil.removeCRLFCharacters(newNode) + "to group " + StringUtil.removeCRLFCharacters(localGroupId)); } - rdbmsMemberEventProcessor.notifyMembershipEvent(newNode, localGroupId, allActiveNodeIds, - MemberEventType.MEMBER_ADDED); + try { + performDBOperationsWithTimeout(() -> { + rdbmsMemberEventProcessor.notifyMembershipEvent(newNode, localGroupId, allActiveNodeIds, + MemberEventType.MEMBER_ADDED); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(newNode, localGroupId, e + , "Error notifying membership event for new node " + newNode); + throw e; + } } } @@ -572,8 +739,17 @@ private void notifyAddedMembers(List newNodes, List allActiveNod */ private void storeRemovedMemberDetails(List allActiveNodeIds, List removedNodeDetails) { for (NodeDetail nodeDetail : removedNodeDetails) { - communicationBusContext.insertRemovedNodeDetails(nodeDetail.getNodeId(), nodeDetail.getGroupId(), - allActiveNodeIds); + try { + performDBOperationsWithTimeout(() -> { + communicationBusContext.insertRemovedNodeDetails(nodeDetail.getNodeId() + , nodeDetail.getGroupId(), allActiveNodeIds); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(nodeDetail.getNodeId(), nodeDetail.getGroupId(), e + , "Error inserting removed node details for node " + nodeDetail.getNodeId()); + throw e; + } } } @@ -591,8 +767,17 @@ private void notifyRemovedMembers(List removedNodes, List allAct log.debug("Member removed " + StringUtil.removeCRLFCharacters(removedNode) + "from group " + StringUtil.removeCRLFCharacters(localGroupId)); } - rdbmsMemberEventProcessor.notifyMembershipEvent(removedNode, localGroupId, allActiveNodeIds, - MemberEventType.MEMBER_REMOVED); + try { + performDBOperationsWithTimeout(() -> { + rdbmsMemberEventProcessor.notifyMembershipEvent(removedNode, localGroupId, allActiveNodeIds, + MemberEventType.MEMBER_REMOVED); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(removedNode, localGroupId, e + , "Error notifying membership event for removed node " + removedNode); + throw e; + } } } @@ -605,9 +790,7 @@ private void performElectionTask(long currentHeartbeatTime) throws InterruptedEx try { this.currentNodeState = tryToElectSelfAsCoordinator(currentHeartbeatTime); } catch (ClusterCoordinationException e) { - log.error("Error occurred. Current node became a " + NodeState.MEMBER + " node in group " + - localGroupId + ". " + e.getMessage(), e); - this.currentNodeState = NodeState.MEMBER; + throw e; } } @@ -620,11 +803,27 @@ private void performElectionTask(long currentHeartbeatTime) throws InterruptedEx private NodeState tryToElectSelfAsCoordinator(long currentHeartbeatTime) throws ClusterCoordinationException { NodeState nodeState; - boolean electedAsCoordinator = communicationBusContext.createCoordinatorEntry(localNodeId, localGroupId); + boolean electedAsCoordinator; + try { + electedAsCoordinator = performDBOperationsWithTimeout(() + -> communicationBusContext.createCoordinatorEntry(localNodeId, localGroupId)); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e, + "Error occurred while trying to elect self as coordinator for group " + localGroupId); + throw e; + } if (electedAsCoordinator) { log.info("Elected current node (nodeID: " + localNodeId + ") as the coordinator for the group " + localGroupId); - List allNodeInformation = communicationBusContext.getAllNodeData(localGroupId); + List allNodeInformation; + try { + allNodeInformation = performDBOperationsWithTimeout(() + -> communicationBusContext.getAllNodeData(localGroupId)); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e + , "Error retrieving all node data from the database."); + throw e; + } findAddedRemovedMembers(allNodeInformation, currentHeartbeatTime); nodeState = NodeState.COORDINATOR; // notify nodes about coordinator change @@ -632,8 +831,17 @@ private NodeState tryToElectSelfAsCoordinator(long currentHeartbeatTime) for (NodeDetail nodeDetail : allNodeInformation) { nodeIdentifiers.add(nodeDetail.getNodeId()); } - rdbmsMemberEventProcessor.notifyMembershipEvent(localNodeId, localGroupId, nodeIdentifiers, - MemberEventType.COORDINATOR_CHANGED); + try { + performDBOperationsWithTimeout(() -> { + rdbmsMemberEventProcessor.notifyMembershipEvent(localNodeId, localGroupId, nodeIdentifiers, + MemberEventType.COORDINATOR_CHANGED); + return null; + }); + } catch (ClusterCoordinationException e) { + handleDatabaseDelay(localNodeId, localGroupId, e + , "Error notifying membership event for coordinator change."); + throw e; + } } else { if (log.isDebugEnabled()) { log.debug("Election resulted in current node becoming a " + NodeState.MEMBER diff --git a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventCallBack.java b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventCallBack.java new file mode 100644 index 0000000000..d9748ae65f --- /dev/null +++ b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventCallBack.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.micro.integrator.coordination; + +public interface RDBMSMemberEventCallBack { + void onExceptionThrown(String nodeId, Throwable e); +} diff --git a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventListenerTask.java b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventListenerTask.java index 32a7c3b902..2b4008b4c0 100644 --- a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventListenerTask.java +++ b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventListenerTask.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * The task that runs periodically to detect membership change events. @@ -63,23 +65,25 @@ public class RDBMSMemberEventListenerTask implements Runnable { */ private List listeners; - /** - * Using heart beat warning margin to make sure other nodes in the cluster - * are not starting tasks until this node completes stops the tasks - */ - private int heartbeatWarningMargin; + ExecutorService executor = Executors.newSingleThreadExecutor(); private boolean wasMemberUnresponsive = false; - ExecutorService executor = Executors.newSingleThreadExecutor(); + private final Lock lock = new ReentrantLock(); + + private long maxDBReadTime; + + private int heartbeatWarningMargin; + public RDBMSMemberEventListenerTask(String nodeId, String localGroupId, RDBMSCommunicationBusContextImpl communicationBusContext, - int heartbeatWarningMargin) { + int heartbeatWarningMargin, long maxDBReadTime) { this.nodeID = nodeId; this.localGroupId = localGroupId; this.listeners = new ArrayList<>(); this.communicationBusContext = communicationBusContext; + this.maxDBReadTime = maxDBReadTime; this.heartbeatWarningMargin = heartbeatWarningMargin; } @@ -112,23 +116,46 @@ public RDBMSMemberEventListenerTask(String nodeId, String localGroupId, log.debug("No membership events to sync"); } } - if (wasMemberUnresponsive) { - notifyRejoin(nodeID, localGroupId); - wasMemberUnresponsive = false; - } } catch (Throwable e) { - log.error("Error occurred while reading membership events. ", e); - if (!wasMemberUnresponsive) { - notifyUnresponsiveness(nodeID, localGroupId); - wasMemberUnresponsive = true; + //sleep for a while to avoid spinning + try { + Thread.sleep(this.heartbeatWarningMargin); + } catch (InterruptedException e1) { + log.error("Error while sleeping the thread", e1); + } + } + } + + public boolean isMemberUnresponsive() { + lock.lock(); + try { + return wasMemberUnresponsive; + } finally { + lock.unlock(); + } + } + + public void setMemberUnresponsiveIfNeeded(String nodeID, String localGroupId, boolean setUnresponsive) { + lock.lock(); + try { + if (setUnresponsive) { + if (!wasMemberUnresponsive) { + log.warn("Node [" + nodeID + "] in group [" + localGroupId + "] has become unresponsive."); + notifyUnresponsiveness(nodeID, localGroupId); + wasMemberUnresponsive = true; + } + } else { + wasMemberUnresponsive = false; } + } finally { + lock.unlock(); } } private List getMembershipEvents() throws ClusterCoordinationException { try { Future> listFuture = executor.submit(this::readMembershipEvents); - return listFuture.get(heartbeatWarningMargin, TimeUnit.MILLISECONDS); + return listFuture.get(maxDBReadTime, TimeUnit.MILLISECONDS); } catch (TimeoutException | InterruptedException | ExecutionException e) { throw new ClusterCoordinationException("Reading membership events took more time than max timeout retry interval", e); } @@ -187,11 +214,16 @@ private void notifyMemberAdditionEvent(String member, String groupId) { * * @param member The node ID of the event occurred */ - private void notifyRejoin(String member, String groupId) { + public void notifyRejoin(String member, String groupId) { listeners.forEach(listener -> { if (listener.getGroupId().equals(groupId)) { - listener.reJoined(member); + listener.reJoined(member, (nodeId, e) -> { + if (nodeId.equals(RDBMSMemberEventListenerTask.this.nodeID)) { + log.warn("Node [" + nodeId + "] became unresponsive due to an exception.", e); + setMemberUnresponsiveIfNeeded(nodeId, localGroupId, true); + } + }); } }); } diff --git a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventProcessor.java b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventProcessor.java index 75d4c2a361..7e20ec9a05 100644 --- a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventProcessor.java +++ b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSMemberEventProcessor.java @@ -56,12 +56,12 @@ public class RDBMSMemberEventProcessor { private RDBMSCommunicationBusContextImpl communicationBusContext; public RDBMSMemberEventProcessor(String localNodeId, String localGroupId, int heartbeatMaxRetry, - RDBMSCommunicationBusContextImpl communicationBusContext) { + RDBMSCommunicationBusContextImpl communicationBusContext, long maxDBReadTime) { this.communicationBusContext = communicationBusContext; ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("ClusterEventReaderTask-%d").build(); this.clusterMembershipReaderTaskScheduler = Executors.newSingleThreadScheduledExecutor(namedThreadFactory); - addNewListenerTask(localNodeId, localGroupId, heartbeatMaxRetry); + addNewListenerTask(localNodeId, localGroupId, heartbeatMaxRetry, maxDBReadTime); } /** @@ -69,7 +69,7 @@ public RDBMSMemberEventProcessor(String localNodeId, String localGroupId, int he * * @param nodeId the node ID of the node which starts the listening */ - private void addNewListenerTask(String nodeId, String localGroupId, int heartbeatMaxRetry) { + private void addNewListenerTask(String nodeId, String localGroupId, int heartbeatMaxRetry, long maxDBReadTime) { int scheduledPeriod; String scheduledPeriodStr = System.getProperty(RDBMSConstantUtils.SCHEDULED_PERIOD); if (scheduledPeriodStr == null) { @@ -84,7 +84,8 @@ private void addNewListenerTask(String nodeId, String localGroupId, int heartbea scheduledPeriod = RDBMSConstantUtils.DEFAULT_SCHEDULED_PERIOD_INTERVAL; } } - membershipListenerTask = new RDBMSMemberEventListenerTask(nodeId, localGroupId, communicationBusContext, heartbeatMaxRetry); + membershipListenerTask = new RDBMSMemberEventListenerTask(nodeId, localGroupId, communicationBusContext, + heartbeatMaxRetry, maxDBReadTime); this.clusterMembershipReaderTaskScheduler.scheduleWithFixedDelay(membershipListenerTask, scheduledPeriod, scheduledPeriod, TimeUnit.MILLISECONDS); if (log.isDebugEnabled()) { @@ -122,6 +123,36 @@ public void addEventListener(MemberEventListener membershipListener) { membershipListenerTask.addEventListener(membershipListener); } + /** + * Set the unresponsiveness of a member if needed. + * + * @param nodeID the node ID of the member + * @param localGroupId the local group ID + * @param setUnresponsive flag to set unresponsiveness + */ + public void setMemberUnresponsiveIfNeeded(String nodeID, String localGroupId, boolean setUnresponsive) { + membershipListenerTask.setMemberUnresponsiveIfNeeded(nodeID, localGroupId, setUnresponsive); + } + + /** + * Check whether the member has rejoined after being unresponsive. + * + * @return true if the member has rejoined after being unresponsive, false otherwise + */ + public boolean isMemberUnresponsive() { + return membershipListenerTask.isMemberUnresponsive(); + } + + /** + * Set the rejoin status of a member. + * + * @param nodeID the node ID of the member + * @param localGroupId the local group ID + */ + public void setMemberRejoined(String nodeID, String localGroupId) { + membershipListenerTask.notifyRejoin(nodeID, localGroupId); + } + /** * Remove a previously added listener. * From 35ff8b7141a8514bd44b6268a97827a0e7092113 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Fri, 20 Sep 2024 10:13:46 +0530 Subject: [PATCH 2/2] Fix review points --- .../coordination/task/TaskEventListener.java | 1 + .../RDBMSCommunicationBusContextImpl.java | 69 +++++++------------ .../RDBMSCoordinationStrategy.java | 3 +- 3 files changed, 27 insertions(+), 46 deletions(-) diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java index a43730e470..89346562d5 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java @@ -112,6 +112,7 @@ public void becameUnresponsive(String nodeId) { ScheduledExecutorService taskScheduler = dataHolder.getTaskScheduler(); if (taskScheduler != null) { LOG.info("Shutting down coordinated task scheduler scheduler since the node became unresponsive."); + // Shutdown the task scheduler forcefully since node became unresponsive and need to avoid task duplication. taskScheduler.shutdownNow(); dataHolder.setTaskScheduler(null); } diff --git a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCommunicationBusContextImpl.java b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCommunicationBusContextImpl.java index d4744a0de6..f470c2fb5b 100644 --- a/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCommunicationBusContextImpl.java +++ b/components/org.wso2.micro.integrator.coordination/src/main/java/org/wso2/micro/integrator/coordination/RDBMSCommunicationBusContextImpl.java @@ -97,11 +97,7 @@ public void storeMembershipEvent(String changedMember, String groupId, List