From 68665733a8a4ed8770cd57f6022f6cba3926d98f Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Mon, 4 Apr 2022 02:24:16 +0000 Subject: [PATCH 1/7] Allows lockID to be set from constructor Signed-off-by: Clay Downs --- .../jobscheduler/spi/LockModel.java | 24 ++++++- .../jobscheduler/spi/utils/LockService.java | 27 +++++++- .../jobscheduler/spi/utils/LockServiceIT.java | 65 +++++++++++++++---- 3 files changed, 100 insertions(+), 16 deletions(-) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java index 2ff8876a..c5f5c331 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java @@ -43,7 +43,7 @@ public final class LockModel implements ToXContentObject { * @param primaryTerm primary term from OpenSearch document. */ public LockModel(final LockModel copyLock, long seqNo, long primaryTerm) { - this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockTime, copyLock.lockDurationSeconds, + this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, copyLock.lockTime, copyLock.lockDurationSeconds, copyLock.released, seqNo, primaryTerm); } @@ -54,7 +54,7 @@ public LockModel(final LockModel copyLock, long seqNo, long primaryTerm) { * @param released boolean flag to indicate if the lock is released */ public LockModel(final LockModel copyLock, final boolean released) { - this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockTime, copyLock.lockDurationSeconds, + this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, copyLock.lockTime, copyLock.lockDurationSeconds, released, copyLock.seqNo, copyLock.primaryTerm); } @@ -68,7 +68,8 @@ public LockModel(final LockModel copyLock, final boolean released) { */ public LockModel(final LockModel copyLock, final Instant updateLockTime, final long lockDurationSeconds, final boolean released) { - this(copyLock.jobIndexName, copyLock.jobId, updateLockTime, lockDurationSeconds, released, copyLock.seqNo, copyLock.primaryTerm); + this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, updateLockTime, lockDurationSeconds, released, + copyLock.seqNo, copyLock.primaryTerm); } public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockDurationSeconds, boolean released) { @@ -76,6 +77,11 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockD SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); } + public LockModel(String jobIndexName, String jobId, String lockId, Instant lockTime, long lockDurationSeconds, boolean released) { + this(jobIndexName, jobId, lockId, lockTime, lockDurationSeconds, released, + SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + } + public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) { this.lockId = jobIndexName + LOCK_ID_DELIMITR + jobId; @@ -88,6 +94,18 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime, this.primaryTerm = primaryTerm; } + public LockModel(String jobIndexName, String jobId, String lockId, Instant lockTime, + long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) { + this.lockId = lockId; + this.jobIndexName = jobIndexName; + this.jobId = jobId; + this.lockTime = lockTime; + this.lockDurationSeconds = lockDurationSeconds; + this.released = released; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + public static String generateLockId(String jobIndexName, String jobId) { return jobIndexName + LOCK_ID_DELIMITR + jobId; } diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index 435135c2..f77d07df 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -38,6 +38,7 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.Map; public final class LockService { private static final Logger logger = LogManager.getLogger(LockService.class); @@ -106,6 +107,28 @@ public void acquireLock(final ScheduledJobParameter jobParameter, final JobExecutionContext context, ActionListener listener) { final String jobIndexName = context.getJobIndexName(); final String jobId = context.getJobId(); + String lockId = LockModel.generateLockId(jobIndexName, jobId); + acquireLockWithId(jobParameter, context, lockId, listener); + } + + /** + * Attempts to acquire a lock with a specific lock Id. If the lock does not exist it attempts to create the lock document. + * If the Lock document exists, it will try to update and acquire the lock. + * + * @param context a {@code JobExecutionContext} containing job index name and job id. + * @param lockDurationSeconds the amount of time in seconds that the lock should exist + * @param lockId the unique Id for the lock. This should represent the resource that the lock is on, whether it be + * a job, or some other arbitrary resource. + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the lock if it was acquired + * or else null. Passes {@code IllegalArgumentException} to onFailure if the {@code ScheduledJobParameter} does not + * have {@code LockDurationSeconds}. + */ + public void acquireLockWithId(final ScheduledJobParameter jobParameter, + final JobExecutionContext context, + final String lockId, + ActionListener listener) { + final String jobIndexName = context.getJobIndexName(); + final String jobId = context.getJobId(); if (jobParameter.getLockDurationSeconds() == null) { listener.onFailure(new IllegalArgumentException("Job LockDuration should not be null")); } else { @@ -114,7 +137,7 @@ public void acquireLock(final ScheduledJobParameter jobParameter, created -> { if (created) { try { - findLock(LockModel.generateLockId(jobIndexName, jobId), ActionListener.wrap( + findLock(lockId, ActionListener.wrap( existingLock -> { if (existingLock != null) { if (isLockReleasedOrExpired(existingLock)) { @@ -130,7 +153,7 @@ public void acquireLock(final ScheduledJobParameter jobParameter, } } else { // There is no lock object and it is first time. Create new lock. - LockModel tempLock = new LockModel(jobIndexName, jobId, getNow(), + LockModel tempLock = new LockModel(jobIndexName, jobId, lockId, getNow(), lockDurationSecond, false); logger.debug("Lock does not exist. Creating new lock" + tempLock); createLock(tempLock, listener); diff --git a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java index 48699aa3..fc27f7f8 100644 --- a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java +++ b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java @@ -117,17 +117,56 @@ public void testSanity() throws Exception { latch.await(5L, TimeUnit.SECONDS); } + public void testSanityWithCustomLockID() throws Exception { + String lockID = "sanity_test_lock"; + String uniqSuffix = "_sanity"; + CountDownLatch latch = new CountDownLatch(1); + LockService lockService = new LockService(client(), this.clusterService); + final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + Instant testTime = Instant.now(); + lockService.setTime(testTime); + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lock -> { + assertNotNull("Expected to successfully grab lock.", lock); + assertEquals("job_id does not match.", JOB_ID + uniqSuffix, lock.getJobId()); + assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName()); + assertEquals("lock_id does not match.", lockID, lock.getLockId()); + assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds()); + assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond()); + assertFalse("Lock should not be released.", lock.isReleased()); + assertFalse("Lock should not expire.", lock.isExpired()); + lockService.release(lock, ActionListener.wrap( + released -> { + assertTrue("Failed to release lock.", released); + lockService.deleteLock(lock.getLockId(), ActionListener.wrap( + deleted -> { + assertTrue("Failed to delete lock.", deleted); + latch.countDown(); + }, + exception -> fail(exception.getMessage()) + )); + }, + exception -> fail(exception.getMessage()) + )); + }, + exception -> fail(exception.getMessage()) + )); + latch.await(5L, TimeUnit.SECONDS); + } + public void testSecondAcquireLockFail() throws Exception { String uniqSuffix = "_second_acquire"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock2 -> { assertNull("Expected to failed to get lock.", lock2); lockService.release(lock, ActionListener.wrap( @@ -154,18 +193,19 @@ public void testSecondAcquireLockFail() throws Exception { public void testLockReleasedAndAcquired() throws Exception { String uniqSuffix = "_lock_release+acquire"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); lockService.release(lock, ActionListener.wrap( released -> { assertTrue("Failed to release lock.", released); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock2 -> { assertNotNull("Expected to successfully grab lock2", lock2); lockService.release(lock2, ActionListener.wrap( @@ -195,6 +235,7 @@ public void testLockReleasedAndAcquired() throws Exception { public void testLockExpired() throws Exception { String uniqSuffix = "_lock_expire"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); // Set lock time in the past. @@ -203,12 +244,12 @@ public void testLockExpired() throws Exception { lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); // Set lock back to current time to make the lock expire. lockService.setTime(null); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock2 -> { assertNotNull("Expected to successfully grab lock", lock2); lockService.release(lock, ActionListener.wrap( @@ -280,12 +321,12 @@ public void testDeleteNonExistingLock() throws Exception { @Ignore public void testMultiThreadCreateLock() throws Exception { String uniqSuffix = "_multi_thread_create"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); final LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.createLockIndex(ActionListener.wrap( created -> { if (created) { @@ -293,7 +334,7 @@ public void testMultiThreadCreateLock() throws Exception { final AtomicReference lockModelAtomicReference = new AtomicReference<>(null); Callable callable = () -> { CountDownLatch callableLatch = new CountDownLatch(1); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { if (lock != null) { lockModelAtomicReference.set(lock); @@ -355,6 +396,7 @@ public void testMultiThreadCreateLock() throws Exception { @Ignore public void testMultiThreadAcquireLock() throws Exception { String uniqSuffix = "_multi_thread_acquire"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); final LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), @@ -365,7 +407,7 @@ public void testMultiThreadAcquireLock() throws Exception { if (created) { // Set lock time in the past. lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS))); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( createdLock -> { assertNotNull(createdLock); // Set lock back to current time to make the lock expire. @@ -375,7 +417,7 @@ public void testMultiThreadAcquireLock() throws Exception { final AtomicReference lockModelAtomicReference = new AtomicReference<>(null); Callable callable = () -> { CountDownLatch callableLatch = new CountDownLatch(1); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { if (lock != null) { lockModelAtomicReference.set(lock); @@ -430,12 +472,13 @@ public void testMultiThreadAcquireLock() throws Exception { public void testRenewLock() throws Exception { String uniqSuffix = "_lock_renew"; + String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( + lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); // Set the time of LockService (the 'lockTime' of acquired locks) to a fixed time. From e817aa596149f4fafcecb5a6a1ef4ca95ecc69c1 Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Mon, 4 Apr 2022 17:41:17 +0000 Subject: [PATCH 2/7] Adds parse with ID Signed-off-by: Clay Downs --- .../jobscheduler/spi/LockModel.java | 17 ++++++++++ .../jobscheduler/spi/utils/LockService.java | 3 +- .../jobscheduler/spi/utils/LockServiceIT.java | 31 ++++++++----------- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java index c5f5c331..c947274a 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java @@ -153,6 +153,23 @@ public static LockModel parse(final XContentParser parser, long seqNo, long prim ); } + /* + * Parses the LockModel while also taking in the lockID, which will be the document ID of the lock being parsed. + */ + public static LockModel parseWithID(final XContentParser parser, long seqNo, long primaryTerm, String lockId) throws IOException { + LockModel lockNoID = parse(parser, seqNo, primaryTerm); + return new LockModel( + lockNoID.jobIndexName, + lockNoID.jobId, + requireNonNull(lockId, "LockId cannot be when explicitly provided in the parser"), + lockNoID.lockTime, + lockNoID.lockDurationSeconds, + lockNoID.released, + seqNo, + primaryTerm + ); + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject() .field(JOB_INDEX_NAME, this.jobIndexName) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index f77d07df..ed349418 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -38,7 +38,6 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.util.Map; public final class LockService { private static final Logger logger = LogManager.getLogger(LockService.class); @@ -250,7 +249,7 @@ private void findLock(final String lockId, ActionListener listener) { .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); parser.nextToken(); - listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); + listener.onResponse(LockModel.parseWithID(parser, response.getSeqNo(), response.getPrimaryTerm(), response.getId())); } catch (IOException e) { logger.error("IOException occurred finding lock", e); listener.onResponse(null); diff --git a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java index fc27f7f8..33e056ad 100644 --- a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java +++ b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java @@ -5,29 +5,25 @@ package org.opensearch.jobscheduler.spi.utils; +import org.junit.Before; +import org.junit.Ignore; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.jobscheduler.spi.JobDocVersion; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.schedule.Schedule; -import org.opensearch.action.ActionListener; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.Before; -import org.junit.Ignore; -import org.mockito.Mockito; import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -84,7 +80,7 @@ public void testSanity() throws Exception { CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); Instant testTime = Instant.now(); lockService.setTime(testTime); lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( @@ -161,7 +157,7 @@ public void testSecondAcquireLockFail() throws Exception { CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { @@ -197,7 +193,7 @@ public void testLockReleasedAndAcquired() throws Exception { CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { @@ -241,8 +237,7 @@ public void testLockExpired() throws Exception { // Set lock time in the past. lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS))); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { @@ -325,7 +320,7 @@ public void testMultiThreadCreateLock() throws Exception { CountDownLatch latch = new CountDownLatch(1); final LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.createLockIndex(ActionListener.wrap( created -> { @@ -400,7 +395,7 @@ public void testMultiThreadAcquireLock() throws Exception { CountDownLatch latch = new CountDownLatch(1); final LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.createLockIndex(ActionListener.wrap( created -> { From 243b62d412f201f22cbe04ab527b6e402704a220 Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Mon, 4 Apr 2022 17:45:35 +0000 Subject: [PATCH 3/7] Fixes spacing Signed-off-by: Clay Downs --- .../jobscheduler/spi/utils/LockServiceIT.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java index 33e056ad..106717d3 100644 --- a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java +++ b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java @@ -80,7 +80,7 @@ public void testSanity() throws Exception { CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); Instant testTime = Instant.now(); lockService.setTime(testTime); lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap( @@ -119,7 +119,7 @@ public void testSanityWithCustomLockID() throws Exception { CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); Instant testTime = Instant.now(); lockService.setTime(testTime); lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( @@ -157,7 +157,7 @@ public void testSecondAcquireLockFail() throws Exception { CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { @@ -193,7 +193,7 @@ public void testLockReleasedAndAcquired() throws Exception { CountDownLatch latch = new CountDownLatch(1); LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { @@ -237,7 +237,7 @@ public void testLockExpired() throws Exception { // Set lock time in the past. lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS))); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( lock -> { @@ -320,7 +320,7 @@ public void testMultiThreadCreateLock() throws Exception { CountDownLatch latch = new CountDownLatch(1); final LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.createLockIndex(ActionListener.wrap( created -> { @@ -395,7 +395,7 @@ public void testMultiThreadAcquireLock() throws Exception { CountDownLatch latch = new CountDownLatch(1); final LockService lockService = new LockService(client(), this.clusterService); final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), - lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); + lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); lockService.createLockIndex(ActionListener.wrap( created -> { From 68279c41ac0db6fbe1703af95e3c0f040272ebb1 Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Mon, 4 Apr 2022 17:50:49 +0000 Subject: [PATCH 4/7] Fixes javadoc Signed-off-by: Clay Downs --- .../java/org/opensearch/jobscheduler/spi/utils/LockService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index ed349418..0a82331c 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -114,8 +114,8 @@ public void acquireLock(final ScheduledJobParameter jobParameter, * Attempts to acquire a lock with a specific lock Id. If the lock does not exist it attempts to create the lock document. * If the Lock document exists, it will try to update and acquire the lock. * + * @param jobParameter a {@code ScheduledJobParameter} containing the lock duration. * @param context a {@code JobExecutionContext} containing job index name and job id. - * @param lockDurationSeconds the amount of time in seconds that the lock should exist * @param lockId the unique Id for the lock. This should represent the resource that the lock is on, whether it be * a job, or some other arbitrary resource. * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the lock if it was acquired From a05960dcdc8c1e2a5f47bdbf958192c13dda2200 Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Mon, 4 Apr 2022 18:00:12 +0000 Subject: [PATCH 5/7] Removes wildcard import Signed-off-by: Clay Downs --- .../opensearch/jobscheduler/spi/utils/LockServiceIT.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java index 106717d3..979ad81a 100644 --- a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java +++ b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java @@ -23,7 +23,11 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; From 2e5452fe7d4727d1d732dc857417bb36c309b040 Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Mon, 4 Apr 2022 22:26:51 +0000 Subject: [PATCH 6/7] Uses lockID in place of jobID Signed-off-by: Clay Downs --- .../jobscheduler/spi/LockModel.java | 41 ++----------------- .../jobscheduler/spi/utils/LockService.java | 35 ++++++++-------- .../jobscheduler/spi/utils/LockServiceIT.java | 26 ++++++------ 3 files changed, 35 insertions(+), 67 deletions(-) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java index c947274a..2ff8876a 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java @@ -43,7 +43,7 @@ public final class LockModel implements ToXContentObject { * @param primaryTerm primary term from OpenSearch document. */ public LockModel(final LockModel copyLock, long seqNo, long primaryTerm) { - this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, copyLock.lockTime, copyLock.lockDurationSeconds, + this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockTime, copyLock.lockDurationSeconds, copyLock.released, seqNo, primaryTerm); } @@ -54,7 +54,7 @@ public LockModel(final LockModel copyLock, long seqNo, long primaryTerm) { * @param released boolean flag to indicate if the lock is released */ public LockModel(final LockModel copyLock, final boolean released) { - this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, copyLock.lockTime, copyLock.lockDurationSeconds, + this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockTime, copyLock.lockDurationSeconds, released, copyLock.seqNo, copyLock.primaryTerm); } @@ -68,8 +68,7 @@ public LockModel(final LockModel copyLock, final boolean released) { */ public LockModel(final LockModel copyLock, final Instant updateLockTime, final long lockDurationSeconds, final boolean released) { - this(copyLock.jobIndexName, copyLock.jobId, copyLock.lockId, updateLockTime, lockDurationSeconds, released, - copyLock.seqNo, copyLock.primaryTerm); + this(copyLock.jobIndexName, copyLock.jobId, updateLockTime, lockDurationSeconds, released, copyLock.seqNo, copyLock.primaryTerm); } public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockDurationSeconds, boolean released) { @@ -77,11 +76,6 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockD SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); } - public LockModel(String jobIndexName, String jobId, String lockId, Instant lockTime, long lockDurationSeconds, boolean released) { - this(jobIndexName, jobId, lockId, lockTime, lockDurationSeconds, released, - SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - } - public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) { this.lockId = jobIndexName + LOCK_ID_DELIMITR + jobId; @@ -94,18 +88,6 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime, this.primaryTerm = primaryTerm; } - public LockModel(String jobIndexName, String jobId, String lockId, Instant lockTime, - long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) { - this.lockId = lockId; - this.jobIndexName = jobIndexName; - this.jobId = jobId; - this.lockTime = lockTime; - this.lockDurationSeconds = lockDurationSeconds; - this.released = released; - this.seqNo = seqNo; - this.primaryTerm = primaryTerm; - } - public static String generateLockId(String jobIndexName, String jobId) { return jobIndexName + LOCK_ID_DELIMITR + jobId; } @@ -153,23 +135,6 @@ public static LockModel parse(final XContentParser parser, long seqNo, long prim ); } - /* - * Parses the LockModel while also taking in the lockID, which will be the document ID of the lock being parsed. - */ - public static LockModel parseWithID(final XContentParser parser, long seqNo, long primaryTerm, String lockId) throws IOException { - LockModel lockNoID = parse(parser, seqNo, primaryTerm); - return new LockModel( - lockNoID.jobIndexName, - lockNoID.jobId, - requireNonNull(lockId, "LockId cannot be when explicitly provided in the parser"), - lockNoID.lockTime, - lockNoID.lockDurationSeconds, - lockNoID.released, - seqNo, - primaryTerm - ); - } - @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject() .field(JOB_INDEX_NAME, this.jobIndexName) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index 0a82331c..9b135bb4 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -106,44 +106,46 @@ public void acquireLock(final ScheduledJobParameter jobParameter, final JobExecutionContext context, ActionListener listener) { final String jobIndexName = context.getJobIndexName(); final String jobId = context.getJobId(); - String lockId = LockModel.generateLockId(jobIndexName, jobId); - acquireLockWithId(jobParameter, context, lockId, listener); + final long lockDurationSeconds = jobParameter.getLockDurationSeconds(); + acquireLockWithId(jobIndexName, lockDurationSeconds, jobId, listener); } /** * Attempts to acquire a lock with a specific lock Id. If the lock does not exist it attempts to create the lock document. * If the Lock document exists, it will try to update and acquire the lock. * - * @param jobParameter a {@code ScheduledJobParameter} containing the lock duration. - * @param context a {@code JobExecutionContext} containing job index name and job id. + * @param jobIndexName a non-null job index name. + * @param lockDurationSeconds the amount of time in seconds that the lock should exist * @param lockId the unique Id for the lock. This should represent the resource that the lock is on, whether it be - * a job, or some other arbitrary resource. + * a job, or some other arbitrary resource. If the lockID matches a jobID, then the lock will be deleted + * when the job is deleted. * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the lock if it was acquired * or else null. Passes {@code IllegalArgumentException} to onFailure if the {@code ScheduledJobParameter} does not * have {@code LockDurationSeconds}. */ - public void acquireLockWithId(final ScheduledJobParameter jobParameter, - final JobExecutionContext context, + public void acquireLockWithId(final String jobIndexName, + final Long lockDurationSeconds, final String lockId, ActionListener listener) { - final String jobIndexName = context.getJobIndexName(); - final String jobId = context.getJobId(); - if (jobParameter.getLockDurationSeconds() == null) { + if (lockDurationSeconds == null) { listener.onFailure(new IllegalArgumentException("Job LockDuration should not be null")); + } else if (jobIndexName == null) { + listener.onFailure(new IllegalArgumentException("Job index name should not be null")); + } else if (lockId == null) { + listener.onFailure(new IllegalArgumentException("Lock ID should not be null")); } else { - final long lockDurationSecond = jobParameter.getLockDurationSeconds(); createLockIndex(ActionListener.wrap( created -> { if (created) { try { - findLock(lockId, ActionListener.wrap( + findLock(LockModel.generateLockId(jobIndexName, lockId), ActionListener.wrap( existingLock -> { if (existingLock != null) { if (isLockReleasedOrExpired(existingLock)) { // Lock is expired. Attempt to acquire lock. logger.debug("lock is released or expired: " + existingLock); LockModel updateLock = new LockModel(existingLock, getNow(), - lockDurationSecond, false); + lockDurationSeconds, false); updateLock(updateLock, listener); } else { logger.debug("Lock is NOT released or expired. " + existingLock); @@ -152,8 +154,9 @@ public void acquireLockWithId(final ScheduledJobParameter jobParameter, } } else { // There is no lock object and it is first time. Create new lock. - LockModel tempLock = new LockModel(jobIndexName, jobId, lockId, getNow(), - lockDurationSecond, false); + // Note that the lockID will be set to {jobIndexName}-{lockId} + LockModel tempLock = new LockModel(jobIndexName, lockId, getNow(), + lockDurationSeconds, false); logger.debug("Lock does not exist. Creating new lock" + tempLock); createLock(tempLock, listener); } @@ -249,7 +252,7 @@ private void findLock(final String lockId, ActionListener listener) { .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); parser.nextToken(); - listener.onResponse(LockModel.parseWithID(parser, response.getSeqNo(), response.getPrimaryTerm(), response.getId())); + listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); } catch (IOException e) { logger.error("IOException occurred finding lock", e); listener.onResponse(null); diff --git a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java index 979ad81a..20c0c293 100644 --- a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java +++ b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/LockServiceIT.java @@ -126,12 +126,12 @@ public void testSanityWithCustomLockID() throws Exception { lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); Instant testTime = Instant.now(); lockService.setTime(testTime); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock.", lock); - assertEquals("job_id does not match.", JOB_ID + uniqSuffix, lock.getJobId()); + assertEquals("job_id does not match.", lockID, lock.getJobId()); assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName()); - assertEquals("lock_id does not match.", lockID, lock.getLockId()); + assertEquals("lock_id does not match.", lock.getJobIndexName() + "-" + lockID, lock.getLockId()); assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds()); assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond()); assertFalse("Lock should not be released.", lock.isReleased()); @@ -163,10 +163,10 @@ public void testSecondAcquireLockFail() throws Exception { final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock2 -> { assertNull("Expected to failed to get lock.", lock2); lockService.release(lock, ActionListener.wrap( @@ -199,13 +199,13 @@ public void testLockReleasedAndAcquired() throws Exception { final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); lockService.release(lock, ActionListener.wrap( released -> { assertTrue("Failed to release lock.", released); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock2 -> { assertNotNull("Expected to successfully grab lock2", lock2); lockService.release(lock2, ActionListener.wrap( @@ -243,12 +243,12 @@ public void testLockExpired() throws Exception { final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); // Set lock back to current time to make the lock expire. lockService.setTime(null); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock2 -> { assertNotNull("Expected to successfully grab lock", lock2); lockService.release(lock, ActionListener.wrap( @@ -333,7 +333,7 @@ public void testMultiThreadCreateLock() throws Exception { final AtomicReference lockModelAtomicReference = new AtomicReference<>(null); Callable callable = () -> { CountDownLatch callableLatch = new CountDownLatch(1); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { if (lock != null) { lockModelAtomicReference.set(lock); @@ -406,7 +406,7 @@ public void testMultiThreadAcquireLock() throws Exception { if (created) { // Set lock time in the past. lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS))); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( createdLock -> { assertNotNull(createdLock); // Set lock back to current time to make the lock expire. @@ -416,7 +416,7 @@ public void testMultiThreadAcquireLock() throws Exception { final AtomicReference lockModelAtomicReference = new AtomicReference<>(null); Callable callable = () -> { CountDownLatch callableLatch = new CountDownLatch(1); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { if (lock != null) { lockModelAtomicReference.set(lock); @@ -477,7 +477,7 @@ public void testRenewLock() throws Exception { final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0), lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix); - lockService.acquireLockWithId(TEST_SCHEDULED_JOB_PARAM, context, lockID, ActionListener.wrap( + lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap( lock -> { assertNotNull("Expected to successfully grab lock", lock); // Set the time of LockService (the 'lockTime' of acquired locks) to a fixed time. From 5b48bc23bb7af21bb75fc292950203b085fd89c0 Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Mon, 4 Apr 2022 23:08:29 +0000 Subject: [PATCH 7/7] Adds comment Signed-off-by: Clay Downs --- .../main/java/org/opensearch/jobscheduler/spi/LockModel.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java index 2ff8876a..767d6054 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java @@ -80,6 +80,8 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime, long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) { this.lockId = jobIndexName + LOCK_ID_DELIMITR + jobId; this.jobIndexName = jobIndexName; + // The jobId parameter does not necessarily need to represent the id of a job scheduler job, as it is being used + // to scope the lock, and could represent any resource. this.jobId = jobId; this.lockTime = lockTime; this.lockDurationSeconds = lockDurationSeconds;