diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java index 2ff8876a..767d6054 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java @@ -80,6 +80,8 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) { this.lockId = jobIndexName + LOCK_ID_DELIMITR + jobId; this.jobIndexName = jobIndexName; + // The jobId parameter does not necessarily need to represent the id of a job scheduler job, as it is being used + // to scope the lock, and could represent any resource. this.jobId = jobId; this.lockTime = lockTime; this.lockDurationSeconds = lockDurationSeconds; diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index 435135c2..9b135bb4 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -106,22 +106,46 @@ public void acquireLock(final ScheduledJobParameter jobParameter, final JobExecutionContext context, ActionListener listener) { final String jobIndexName = context.getJobIndexName(); final String jobId = context.getJobId(); - if (jobParameter.getLockDurationSeconds() == null) { + final long lockDurationSeconds = jobParameter.getLockDurationSeconds(); + acquireLockWithId(jobIndexName, lockDurationSeconds, jobId, listener); + } + + /** + * Attempts to acquire a lock with a specific lock Id. If the lock does not exist it attempts to create the lock document. + * If the Lock document exists, it will try to update and acquire the lock. + * + * @param jobIndexName a non-null job index name. + * @param lockDurationSeconds the amount of time in seconds that the lock should exist + * @param lockId the unique Id for the lock. This should represent the resource that the lock is on, whether it be + * a job, or some other arbitrary resource. If the lockID matches a jobID, then the lock will be deleted + * when the job is deleted. + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the lock if it was acquired + * or else null. Passes {@code IllegalArgumentException} to onFailure if the {@code ScheduledJobParameter} does not + * have {@code LockDurationSeconds}. + */ + public void acquireLockWithId(final String jobIndexName, + final Long lockDurationSeconds, + final String lockId, + ActionListener listener) { + if (lockDurationSeconds == null) { listener.onFailure(new IllegalArgumentException("Job LockDuration should not be null")); + } else if (jobIndexName == null) { + listener.onFailure(new IllegalArgumentException("Job index name should not be null")); + } else if (lockId == null) { + listener.onFailure(new IllegalArgumentException("Lock ID should not be null")); } else { - final long lockDurationSecond = jobParameter.getLockDurationSeconds(); createLockIndex(ActionListener.wrap( created -> { if (created) { try { - findLock(LockModel.generateLockId(jobIndexName, jobId), ActionListener.wrap( + findLock(LockModel.generateLockId(jobIndexName, lockId), ActionListener.wrap( existingLock -> { if (existingLock != null) { if (isLockReleasedOrExpired(existingLock)) { // Lock is expired. Attempt to acquire lock. logger.debug("lock is released or expired: " + existingLock); LockModel updateLock = new LockModel(existingLock, getNow(), - lockDurationSecond, false); + lockDurationSeconds, false); updateLock(updateLock, listener); } else { logger.debug("Lock is NOT released or expired. " + existingLock); @@ -130,8 +154,9 @@ public void acquireLock(final ScheduledJobParameter jobParameter, } } else { // There is no lock object and it is first time. Create new lock. - LockModel tempLock = new LockModel(jobIndexName, jobId, getNow(), - lockDurationSecond, false); + // Note that the lockID will be set to {jobIndexName}-{lockId} + LockModel tempLock = new LockModel(jobIndexName, lockId, getNow(), + lockDurationSeconds, false); logger.debug("Lock does not exist. Creating new lock" + tempLock); createLock(tempLock, listener); } diff --git a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java index 48699aa3..20c0c293 100644 --- a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java +++ b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java @@ -5,18 +5,18 @@ package org.opensearch.jobscheduler.spi.utils; +import org.junit.Before; +import org.junit.Ignore; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.jobscheduler.spi.JobDocVersion; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.schedule.Schedule; -import org.opensearch.action.ActionListener; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.Before; -import org.junit.Ignore; -import org.mockito.Mockito; import java.io.IOException; import java.time.Duration; @@ -117,17 +117,56 @@ public void testSanity() throws Exception { latch.await(5L, TimeUnit.SECONDS); } + public void testSanityWithCustomLockID() throws Exception { + String lockID = "sanity_test_lock"; + String uniqSuffix = "_sanity"; + CountDownLatch latch = new CountDownLatch(1); + LockService lockService = new LockService(client(), this.clusterService); + final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + Instant testTime = Instant.now(); + lockService.setTime(testTime); + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( + lock -> { + assertNotNull("Expected to successfully grab lock.", lock); + assertEquals("job_id does not match.", lockID, lock.getJobId()); + assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName()); + assertEquals("lock_id does not match.", lock.getJobIndexName() + "-" + lockID, lock.getLockId()); + assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds()); + assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond()); + assertFalse("Lock should not be released.", lock.isReleased()); + assertFalse("Lock should not expire.", lock.isExpired()); + lockService.release(lock, ActionListener.wrap( + released -> { + assertTrue("Failed to release lock.", released); + lockService.deleteLock(lock.getLockId(), ActionListener.wrap( + deleted -> { + assertTrue("Failed to delete lock.", deleted); + latch.countDown(); + }, + exception -> fail(exception.getMessage()) + )); + }, + exception -> fail(exception.getMessage()) + )); + }, + exception -> fail(exception.getMessage()) + )); + latch.await(5L, TimeUnit.SECONDS); + } + public void testSecondAcquireLockFail() throws Exception { String uniqSuffix = "_second_acquire"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock2 -> { assertNull("Expected to failed to get lock.", lock2); lockService.release(lock, ActionListener.wrap( @@ -154,18 +193,19 @@ public void testSecondAcquireLockFail() throws Exception { public void testLockReleasedAndAcquired() throws Exception { String uniqSuffix = "_lock_release+acquire"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); lockService.release(lock, ActionListener.wrap( released -> { assertTrue("Failed to release lock.", released); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock2 -> { assertNotNull("Expected to successfully grab lock2", lock2); lockService.release(lock2, ActionListener.wrap( @@ -195,6 +235,7 @@ public void testLockReleasedAndAcquired() throws Exception { public void testLockExpired() throws Exception { String uniqSuffix = "_lock_expire"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); // Set lock time in the past. @@ -202,13 +243,12 @@ public void testLockExpired() throws Exception { final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); // Set lock back to current time to make the lock expire. lockService.setTime(null); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock2 -> { assertNotNull("Expected to successfully grab lock", lock2); lockService.release(lock, ActionListener.wrap( @@ -280,12 +320,12 @@ public void testDeleteNonExistingLock() throws Exception { @Ignore public void testMultiThreadCreateLock() throws Exception { String uniqSuffix = "_multi_thread_create"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); final LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.createLockIndex(ActionListener.wrap( created -> { if (created) { @@ -293,7 +333,7 @@ public void testMultiThreadCreateLock() throws Exception { final AtomicReference lockModelAtomicReference = new AtomicReference<>(null); Callable callable = () -> { CountDownLatch callableLatch = new CountDownLatch(1); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { if (lock != null) { lockModelAtomicReference.set(lock); @@ -355,6 +395,7 @@ public void testMultiThreadCreateLock() throws Exception { @Ignore public void testMultiThreadAcquireLock() throws Exception { String uniqSuffix = "_multi_thread_acquire"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); final LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), @@ -365,7 +406,7 @@ public void testMultiThreadAcquireLock() throws Exception { if (created) { // Set lock time in the past. lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS))); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( createdLock -> { assertNotNull(createdLock); // Set lock back to current time to make the lock expire. @@ -375,7 +416,7 @@ public void testMultiThreadAcquireLock() throws Exception { final AtomicReference lockModelAtomicReference = new AtomicReference<>(null); Callable callable = () -> { CountDownLatch callableLatch = new CountDownLatch(1); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { if (lock != null) { lockModelAtomicReference.set(lock); @@ -430,12 +471,13 @@ public void testMultiThreadAcquireLock() throws Exception { public void testRenewLock() throws Exception { String uniqSuffix = "_lock_renew"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); // Set the time of LockService (the 'lockTime' of acquired locks) to a fixed time.