Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enables locking on an arbitrary lockID #164

Merged
merged 7 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class LockModel implements ToXContentObject {
* @param primaryTerm primary term from OpenSearch document.
*/
public LockModel(final LockModel copyLock, long seqNo, long primaryTerm) {
this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockTime, copyLock.lockDurationSeconds,
this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, copyLock.lockTime, copyLock.lockDurationSeconds,
copyLock.released, seqNo, primaryTerm);
}

Expand All @@ -54,7 +54,7 @@ public LockModel(final LockModel copyLock, long seqNo, long primaryTerm) {
* @param released boolean flag to indicate if the lock is released
*/
public LockModel(final LockModel copyLock, final boolean released) {
this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockTime, copyLock.lockDurationSeconds,
this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, copyLock.lockTime, copyLock.lockDurationSeconds,
released, copyLock.seqNo, copyLock.primaryTerm);
}

Expand All @@ -68,14 +68,20 @@ public LockModel(final LockModel copyLock, final boolean released) {
*/
public LockModel(final LockModel copyLock,
final Instant updateLockTime, final long lockDurationSeconds, final boolean released) {
this(copyLock.jobIndexName, copyLock.jobId, updateLockTime, lockDurationSeconds, released, copyLock.seqNo, copyLock.primaryTerm);
this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, updateLockTime, lockDurationSeconds, released,
copyLock.seqNo, copyLock.primaryTerm);
}

public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockDurationSeconds, boolean released) {
this(jobIndexName, jobId, lockTime, lockDurationSeconds, released,
SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public LockModel(String jobIndexName, String jobId, String lockId, Instant lockTime, long lockDurationSeconds, boolean released) {
this(jobIndexName, jobId, lockId, lockTime, lockDurationSeconds, released,
SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public LockModel(String jobIndexName, String jobId, Instant lockTime,
long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) {
this.lockId = jobIndexName + LOCK_ID_DELIMITR + jobId;
Expand All @@ -88,6 +94,18 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime,
this.primaryTerm = primaryTerm;
}

public LockModel(String jobIndexName, String jobId, String lockId, Instant lockTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want both jobId and lockId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I think that the only reason to include the job ID would be if we wanted to delete the leftover locks after a job is deleted. If we instead moved to a periodic job deleting stale locks, then I think there is no point in having both. I'll make the change.

long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) {
this.lockId = lockId;
this.jobIndexName = jobIndexName;
this.jobId = jobId;
this.lockTime = lockTime;
this.lockDurationSeconds = lockDurationSeconds;
this.released = released;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

public static String generateLockId(String jobIndexName, String jobId) {
return jobIndexName + LOCK_ID_DELIMITR + jobId;
}
Expand Down Expand Up @@ -135,6 +153,23 @@ public static LockModel parse(final XContentParser parser, long seqNo, long prim
);
}

/*
* Parses the LockModel while also taking in the lockID, which will be the document ID of the lock being parsed.
*/
public static LockModel parseWithID(final XContentParser parser, long seqNo, long primaryTerm, String lockId) throws IOException {
LockModel lockNoID = parse(parser, seqNo, primaryTerm);
return new LockModel(
lockNoID.jobIndexName,
lockNoID.jobId,
requireNonNull(lockId, "LockId cannot be when explicitly provided in the parser"),
lockNoID.lockTime,
lockNoID.lockDurationSeconds,
lockNoID.released,
seqNo,
primaryTerm
);
}

@Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject()
.field(JOB_INDEX_NAME, this.jobIndexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ public void acquireLock(final ScheduledJobParameter jobParameter,
final JobExecutionContext context, ActionListener<LockModel> listener) {
final String jobIndexName = context.getJobIndexName();
final String jobId = context.getJobId();
String lockId = LockModel.generateLockId(jobIndexName, jobId);
acquireLockWithId(jobParameter, context, lockId, listener);
}

/**
* Attempts to acquire a lock with a specific lock Id. If the lock does not exist it attempts to create the lock document.
* If the Lock document exists, it will try to update and acquire the lock.
*
* @param jobParameter a {@code ScheduledJobParameter} containing the lock duration.
* @param context a {@code JobExecutionContext} containing job index name and job id.
* @param lockId the unique Id for the lock. This should represent the resource that the lock is on, whether it be
* a job, or some other arbitrary resource.
* @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the lock if it was acquired
* or else null. Passes {@code IllegalArgumentException} to onFailure if the {@code ScheduledJobParameter} does not
* have {@code LockDurationSeconds}.
*/
public void acquireLockWithId(final ScheduledJobParameter jobParameter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If given a lockId by the caller, I think we should still prefix it with the job index name to preserve namespaces, otherwise if two different plugins using this end up using the same lockId at any point then they'll run into issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to make this change if you want, but that behavior also enables locking resources across plugins. Any plugin that doesn't want this could just prefix their own id to namespace it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use case for that yet? If not then we can add in support for it when needed, but as of right now relying on the consumers to correctly namespace the lockId leaves room for the developers who are not aware of it to accidentally misuse and cause issues across plugins.

@thalurur Any thoughts on it?

Copy link
Contributor

@thalurur thalurur Apr 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see much benefit in enforcing a global lock since this is not a standard that all plugins need to follow and one plugin assuming that its locking globally using this mechanism might be really not enforcable and I don't think a plugin should assume that.

Keeping it at the a namespace always by job scheduler makes more sense since its contained to that use case and if plugins want to further scope it down they can add more prefixes for the feature in the namespace when creating lock Ids

final JobExecutionContext context,
final String lockId,
ActionListener<LockModel> listener) {
final String jobIndexName = context.getJobIndexName();
final String jobId = context.getJobId();
if (jobParameter.getLockDurationSeconds() == null) {
listener.onFailure(new IllegalArgumentException("Job LockDuration should not be null"));
} else {
Expand All @@ -114,7 +136,7 @@ public void acquireLock(final ScheduledJobParameter jobParameter,
created -> {
if (created) {
try {
findLock(LockModel.generateLockId(jobIndexName, jobId), ActionListener.wrap(
findLock(lockId, ActionListener.wrap(
existingLock -> {
if (existingLock != null) {
if (isLockReleasedOrExpired(existingLock)) {
Expand All @@ -130,7 +152,7 @@ public void acquireLock(final ScheduledJobParameter jobParameter,
}
} else {
// There is no lock object and it is first time. Create new lock.
LockModel tempLock = new LockModel(jobIndexName, jobId, getNow(),
LockModel tempLock = new LockModel(jobIndexName, jobId, lockId, getNow(),
lockDurationSecond, false);
logger.debug("Lock does not exist. Creating new lock" + tempLock);
createLock(tempLock, listener);
Expand Down Expand Up @@ -227,7 +249,7 @@ private void findLock(final String lockId, ActionListener<LockModel> listener) {
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
response.getSourceAsString());
parser.nextToken();
listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm()));
listener.onResponse(LockModel.parseWithID(parser, response.getSeqNo(), response.getPrimaryTerm(), response.getId()));
} catch (IOException e) {
logger.error("IOException occurred finding lock", e);
listener.onResponse(null);
Expand Down
Loading