Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport for 2.x #1453

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus

@Suppress("BlockingMethodInNonBlockingContext")
override fun asyncShardOperation(request: GetChangesRequest, shardId: ShardId, listener: ActionListener<GetChangesResponse>) {
log.debug("calling asyncShardOperation method")
GlobalScope.launch(threadPool.coroutineContext(REPLICATION_EXECUTOR_NAME_LEADER)) {
// TODO: Figure out if we need to acquire a primary permit here
log.debug("$REPLICATION_EXECUTOR_NAME_LEADER coroutine has initiated")
listener.completeWith {
var relativeStartNanos = System.nanoTime()
remoteStatsService.stats[shardId] = remoteStatsService.stats.getOrDefault(shardId, RemoteShardMetric())
Expand All @@ -82,7 +84,9 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
log.trace("Waiting for global checkpoint to advance from ${request.fromSeqNo} Sequence Number")
val gcp = indexShard.waitForGlobalCheckpoint(request.fromSeqNo, WAIT_FOR_NEW_OPS_TIMEOUT)
log.trace("Waiting for global checkpoint to advance is finished for ${request.fromSeqNo} Sequence Number")

// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
// Any checks on the settings is followed by setup checks to ensure all relevant changes are
// present across the plugins
// validate index metadata on the leader cluster
log.debug("Fetching leader cluster state for ${request.leaderIndex} index.")
val leaderClusterState = getLeaderClusterState(request.leaderAlias, request.leaderIndex)
ValidationUtil.validateLeaderIndexState(request.leaderAlias, request.leaderIndex, leaderClusterState)

val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex)
log.debug("Leader settings were fetched for ${request.leaderIndex} index.")

if (leaderSettings.keySet().contains(ReplicationPlugin.REPLICATED_INDEX_SETTING.key) and
!leaderSettings.get(ReplicationPlugin.REPLICATED_INDEX_SETTING.key).isNullOrBlank()) {
Expand All @@ -113,7 +115,9 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
// Setup checks are successful and trigger replication for the index
// permissions evaluation to trigger replication is based on the current security context set
val internalReq = ReplicateIndexClusterManagerNodeRequest(user, request)
log.debug("Starting replication index action on current master node")
client.suspendExecute(ReplicateIndexClusterManagerNodeAction.INSTANCE, internalReq)
log.debug("Response of start replication action is returned")
ReplicateIndexResponse(true)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
throw OpenSearchStatusException("[FORBIDDEN] Replication START block is set", RestStatus.FORBIDDEN)
}

log.debug("Making request to get metadata of ${replicateIndexReq.leaderIndex} index on remote cluster")
val remoteMetadata = getRemoteIndexMetadata(replicateIndexReq.leaderAlias, replicateIndexReq.leaderIndex)
log.debug("Response returned of the request made to get metadata of ${replicateIndexReq.leaderIndex} index on remote cluster")

if (state.routingTable.hasIndex(replicateIndexReq.followerIndex)) {
throw IllegalArgumentException("Cant use same index again for replication. " +
Expand All @@ -115,6 +117,7 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
ReplicationOverallState.RUNNING, user, replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE, null),
replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.LEADER_CLUSTER_ROLE, null), replicateIndexReq.settings)

log.debug("Starting index replication task in persistent task service with name: replication:index:${replicateIndexReq.followerIndex}")
val task = persistentTasksService.startTask("replication:index:${replicateIndexReq.followerIndex}",
IndexReplicationExecutor.TASK_NAME, params)

Expand All @@ -123,13 +126,15 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
listener.onResponse(ReplicateIndexResponse(false))
}

log.debug("Waiting for persistent task to move to following state")
// Now wait for the replication to start and the follower index to get created before returning
persistentTasksService.waitForTaskCondition(task.id, replicateIndexReq.timeout()) { t ->
val replicationState = (t.state as IndexReplicationState?)?.state
replicationState == ReplicationState.FOLLOWING ||
(!replicateIndexReq.waitForRestore && replicationState == ReplicationState.RESTORING)
}

log.debug("Persistent task is moved to following replication state")
listener.onResponse(AcknowledgedResponse(true))
} catch (e: Exception) {
log.error("Failed to trigger replication for ${replicateIndexReq.followerIndex} - ${e.stackTraceToString()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
if (!response.isAcknowledged) {
throw ReplicationException("Failed to auto follow leader index $leaderIndex")
}
log.debug("Auto follow has started replication from ${leaderAlias}:$leaderIndex -> $leaderIndex")
successStart = true
} catch (e: OpenSearchSecurityException) {
// For permission related failures, Adding as part of failed indices as autofollow role doesn't have required permissions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
logDebug("Cluster metadata listener invoked on shard task...")
if (event.metadataChanged()) {
val replicationStateParams = getReplicationStateParamsForIndex(clusterService, followerShardId.indexName)
logDebug("Replication State Params are fetched from cluster state")
if (replicationStateParams == null) {
if (PersistentTasksNodeService.Status(State.STARTED) == status)
cancelTask("Shard replication task received an interrupt.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ suspend fun <Req: ActionRequest, Resp: ActionResponse> Client.suspendExecuteWith
var retryException: Exception
repeat(numberOfRetries - 1) { index ->
try {
log.debug("Sending get changes request after ${currentBackoff / 1000} seconds.")
return suspendExecute(replicationMetadata, action, req,
injectSecurityContext = injectSecurityContext, defaultContext = defaultContext)
} catch (e: OpenSearchException) {
Expand Down
Loading