Skip to content

Commit

Permalink
Enables locking on an arbitrary lockID (#164)
Browse files Browse the repository at this point in the history
* Enables locking on an arbitrary lockID

Signed-off-by: Clay Downs <downsrob@amazon.com>
  • Loading branch information
downsrob authored Apr 7, 2022
1 parent a1f93f1 commit 2a49d38
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime,
long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) {
this.lockId = jobIndexName + LOCK_ID_DELIMITR + jobId;
this.jobIndexName = jobIndexName;
// The jobId parameter does not necessarily need to represent the id of a job scheduler job, as it is being used
// to scope the lock, and could represent any resource.
this.jobId = jobId;
this.lockTime = lockTime;
this.lockDurationSeconds = lockDurationSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,46 @@ public void acquireLock(final ScheduledJobParameter jobParameter,
final JobExecutionContext context, ActionListener<LockModel> listener) {
final String jobIndexName = context.getJobIndexName();
final String jobId = context.getJobId();
if (jobParameter.getLockDurationSeconds() == null) {
final long lockDurationSeconds = jobParameter.getLockDurationSeconds();
acquireLockWithId(jobIndexName, lockDurationSeconds, jobId, listener);
}

/**
* Attempts to acquire a lock with a specific lock Id. If the lock does not exist it attempts to create the lock document.
* If the Lock document exists, it will try to update and acquire the lock.
*
* @param jobIndexName a non-null job index name.
* @param lockDurationSeconds the amount of time in seconds that the lock should exist
* @param lockId the unique Id for the lock. This should represent the resource that the lock is on, whether it be
* a job, or some other arbitrary resource. If the lockID matches a jobID, then the lock will be deleted
* when the job is deleted.
* @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the lock if it was acquired
* or else null. Passes {@code IllegalArgumentException} to onFailure if the {@code ScheduledJobParameter} does not
* have {@code LockDurationSeconds}.
*/
public void acquireLockWithId(final String jobIndexName,
final Long lockDurationSeconds,
final String lockId,
ActionListener<LockModel> listener) {
if (lockDurationSeconds == null) {
listener.onFailure(new IllegalArgumentException("Job LockDuration should not be null"));
} else if (jobIndexName == null) {
listener.onFailure(new IllegalArgumentException("Job index name should not be null"));
} else if (lockId == null) {
listener.onFailure(new IllegalArgumentException("Lock ID should not be null"));
} else {
final long lockDurationSecond = jobParameter.getLockDurationSeconds();
createLockIndex(ActionListener.wrap(
created -> {
if (created) {
try {
findLock(LockModel.generateLockId(jobIndexName, jobId), ActionListener.wrap(
findLock(LockModel.generateLockId(jobIndexName, lockId), ActionListener.wrap(
existingLock -> {
if (existingLock != null) {
if (isLockReleasedOrExpired(existingLock)) {
// Lock is expired. Attempt to acquire lock.
logger.debug("lock is released or expired: " + existingLock);
LockModel updateLock = new LockModel(existingLock, getNow(),
lockDurationSecond, false);
lockDurationSeconds, false);
updateLock(updateLock, listener);
} else {
logger.debug("Lock is NOT released or expired. " + existingLock);
Expand All @@ -130,8 +154,9 @@ public void acquireLock(final ScheduledJobParameter jobParameter,
}
} else {
// There is no lock object and it is first time. Create new lock.
LockModel tempLock = new LockModel(jobIndexName, jobId, getNow(),
lockDurationSecond, false);
// Note that the lockID will be set to {jobIndexName}-{lockId}
LockModel tempLock = new LockModel(jobIndexName, lockId, getNow(),
lockDurationSeconds, false);
logger.debug("Lock does not exist. Creating new lock" + tempLock);
createLock(tempLock, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package org.opensearch.jobscheduler.spi.utils;

import org.junit.Before;
import org.junit.Ignore;
import org.mockito.Mockito;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.jobscheduler.spi.JobDocVersion;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.junit.Ignore;
import org.mockito.Mockito;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -117,17 +117,56 @@ public void testSanity() throws Exception {
latch.await(5L, TimeUnit.SECONDS);
}

public void testSanityWithCustomLockID() throws Exception {
String lockID = "sanity_test_lock";
String uniqSuffix = "_sanity";
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);
Instant testTime = Instant.now();
lockService.setTime(testTime);
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock.", lock);
assertEquals("job_id does not match.", lockID, lock.getJobId());
assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName());
assertEquals("lock_id does not match.", lock.getJobIndexName() + "-" + lockID, lock.getLockId());
assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds());
assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond());
assertFalse("Lock should not be released.", lock.isReleased());
assertFalse("Lock should not expire.", lock.isExpired());
lockService.release(lock, ActionListener.wrap(
released -> {
assertTrue("Failed to release lock.", released);
lockService.deleteLock(lock.getLockId(), ActionListener.wrap(
deleted -> {
assertTrue("Failed to delete lock.", deleted);
latch.countDown();
},
exception -> fail(exception.getMessage())
));
},
exception -> fail(exception.getMessage())
));
},
exception -> fail(exception.getMessage())
));
latch.await(5L, TimeUnit.SECONDS);
}

public void testSecondAcquireLockFail() throws Exception {
String uniqSuffix = "_second_acquire";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);

lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock2 -> {
assertNull("Expected to failed to get lock.", lock2);
lockService.release(lock, ActionListener.wrap(
Expand All @@ -154,18 +193,19 @@ public void testSecondAcquireLockFail() throws Exception {

public void testLockReleasedAndAcquired() throws Exception {
String uniqSuffix = "_lock_release+acquire";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);

lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
lockService.release(lock, ActionListener.wrap(
released -> {
assertTrue("Failed to release lock.", released);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock2 -> {
assertNotNull("Expected to successfully grab lock2", lock2);
lockService.release(lock2, ActionListener.wrap(
Expand Down Expand Up @@ -195,20 +235,20 @@ public void testLockReleasedAndAcquired() throws Exception {

public void testLockExpired() throws Exception {
String uniqSuffix = "_lock_expire";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
// Set lock time in the past.
lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS)));
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);


lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
// Set lock back to current time to make the lock expire.
lockService.setTime(null);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock2 -> {
assertNotNull("Expected to successfully grab lock", lock2);
lockService.release(lock, ActionListener.wrap(
Expand Down Expand Up @@ -280,20 +320,20 @@ public void testDeleteNonExistingLock() throws Exception {
@Ignore
public void testMultiThreadCreateLock() throws Exception {
String uniqSuffix = "_multi_thread_create";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);


lockService.createLockIndex(ActionListener.wrap(
created -> {
if (created) {
ExecutorService executor = Executors.newFixedThreadPool(3);
final AtomicReference<LockModel> lockModelAtomicReference = new AtomicReference<>(null);
Callable<Boolean> callable = () -> {
CountDownLatch callableLatch = new CountDownLatch(1);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Expand Down Expand Up @@ -355,6 +395,7 @@ public void testMultiThreadCreateLock() throws Exception {
@Ignore
public void testMultiThreadAcquireLock() throws Exception {
String uniqSuffix = "_multi_thread_acquire";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
Expand All @@ -365,7 +406,7 @@ public void testMultiThreadAcquireLock() throws Exception {
if (created) {
// Set lock time in the past.
lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS)));
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
createdLock -> {
assertNotNull(createdLock);
// Set lock back to current time to make the lock expire.
Expand All @@ -375,7 +416,7 @@ public void testMultiThreadAcquireLock() throws Exception {
final AtomicReference<LockModel> lockModelAtomicReference = new AtomicReference<>(null);
Callable<Boolean> callable = () -> {
CountDownLatch callableLatch = new CountDownLatch(1);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Expand Down Expand Up @@ -430,12 +471,13 @@ public void testMultiThreadAcquireLock() throws Exception {

public void testRenewLock() throws Exception {
String uniqSuffix = "_lock_renew";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);

lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
// Set the time of LockService (the 'lockTime' of acquired locks) to a fixed time.
Expand Down

0 comments on commit 2a49d38

Please sign in to comment.