Skip to content

Commit 847be33

Browse files
authored
Use instance of LockService instantiated in JobScheduler through Guice (#677)
* WIP to show Geospatial plugin using LockService instance from JS Signed-off-by: Craig Perkins <cwperx@amazon.com> * Use instance of LockService from Guice that is instantiated by Job Scheduler Signed-off-by: Craig Perkins <cwperx@amazon.com> * Fix failing tests Signed-off-by: Craig Perkins <cwperx@amazon.com> * Add to CHANGELOG Signed-off-by: Craig Perkins <cwperx@amazon.com> * Address comments Signed-off-by: Craig Perkins <cwperx@amazon.com> * Add checks to see if initialized Signed-off-by: Craig Perkins <cwperx@amazon.com> * Remove constructor that accepts client Signed-off-by: Craig Perkins <cwperx@amazon.com> * Switch to package-private Signed-off-by: Craig Perkins <cwperx@amazon.com> * package-private Signed-off-by: Craig Perkins <cwperx@amazon.com> * public Signed-off-by: Craig Perkins <cwperx@amazon.com> --------- Signed-off-by: Craig Perkins <cwperx@amazon.com>
1 parent ba4e8c1 commit 847be33

File tree

7 files changed

+126
-26
lines changed

7 files changed

+126
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on ho
2626
### Documentation
2727
### Maintenance
2828
### Refactoring
29+
- Use instance of LockService instantiated in JobScheduler through Guice ([#677](https://github.com/opensearch-project/geospatial/pull/677))

src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import java.util.concurrent.atomic.AtomicReference;
1515

1616
import org.opensearch.OpenSearchException;
17-
import org.opensearch.client.Client;
1817
import org.opensearch.cluster.service.ClusterService;
1918
import org.opensearch.core.action.ActionListener;
2019
import org.opensearch.jobscheduler.spi.LockModel;
@@ -30,17 +29,19 @@ public class Ip2GeoLockService {
3029
public static final long LOCK_DURATION_IN_SECONDS = 300l;
3130
public static final long RENEW_AFTER_IN_SECONDS = 120l;
3231
private final ClusterService clusterService;
33-
private final LockService lockService;
32+
private LockService lockService;
3433

3534
/**
3635
* Constructor
3736
*
3837
* @param clusterService the cluster service
39-
* @param client the client
4038
*/
41-
public Ip2GeoLockService(final ClusterService clusterService, final Client client) {
39+
public Ip2GeoLockService(final ClusterService clusterService) {
4240
this.clusterService = clusterService;
43-
this.lockService = new LockService(client, clusterService);
41+
}
42+
43+
public void initialize(final LockService lockService) {
44+
this.lockService = lockService;
4445
}
4546

4647
/**
@@ -54,6 +55,9 @@ public Ip2GeoLockService(final ClusterService clusterService, final Client clien
5455
* @param listener the listener
5556
*/
5657
public void acquireLock(final String datasourceName, final Long lockDurationSeconds, final ActionListener<LockModel> listener) {
58+
if (lockService == null) {
59+
throw new OpenSearchException("Ip2GeoLockService is not initialized");
60+
}
5761
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener);
5862
}
5963

@@ -65,6 +69,9 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco
6569
* @return lock model
6670
*/
6771
public Optional<LockModel> acquireLock(final String datasourceName, final Long lockDurationSeconds) {
72+
if (lockService == null) {
73+
throw new OpenSearchException("Ip2GeoLockService is not initialized");
74+
}
6875
AtomicReference<LockModel> lockReference = new AtomicReference();
6976
CountDownLatch countDownLatch = new CountDownLatch(1);
7077
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, new ActionListener<>() {
@@ -95,6 +102,9 @@ public void onFailure(final Exception e) {
95102
* @param lockModel the lock model
96103
*/
97104
public void releaseLock(final LockModel lockModel) {
105+
if (lockService == null) {
106+
throw new OpenSearchException("Ip2GeoLockService is not initialized");
107+
}
98108
lockService.release(
99109
lockModel,
100110
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
@@ -108,6 +118,9 @@ public void releaseLock(final LockModel lockModel) {
108118
* @return renewed lock if renew succeed and null otherwise
109119
*/
110120
public LockModel renewLock(final LockModel lockModel) {
121+
if (lockService == null) {
122+
throw new OpenSearchException("Ip2GeoLockService is not initialized");
123+
}
111124
AtomicReference<LockModel> lockReference = new AtomicReference();
112125
CountDownLatch countDownLatch = new CountDownLatch(1);
113126
lockService.renewLock(lockModel, new ActionListener<>() {

src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
import org.opensearch.action.ActionRequest;
1717
import org.opensearch.client.Client;
1818
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
19+
import org.opensearch.cluster.node.DiscoveryNode;
1920
import org.opensearch.cluster.node.DiscoveryNodes;
2021
import org.opensearch.cluster.service.ClusterService;
2122
import org.opensearch.common.collect.MapBuilder;
23+
import org.opensearch.common.inject.Inject;
24+
import org.opensearch.common.lifecycle.Lifecycle;
2225
import org.opensearch.common.lifecycle.LifecycleComponent;
26+
import org.opensearch.common.lifecycle.LifecycleListener;
2327
import org.opensearch.common.settings.ClusterSettings;
2428
import org.opensearch.common.settings.IndexScopedSettings;
2529
import org.opensearch.common.settings.Setting;
@@ -75,7 +79,9 @@
7579
import org.opensearch.index.mapper.Mapper;
7680
import org.opensearch.indices.SystemIndexDescriptor;
7781
import org.opensearch.ingest.Processor;
82+
import org.opensearch.jobscheduler.spi.utils.LockService;
7883
import org.opensearch.plugins.ActionPlugin;
84+
import org.opensearch.plugins.ClusterPlugin;
7985
import org.opensearch.plugins.IngestPlugin;
8086
import org.opensearch.plugins.MapperPlugin;
8187
import org.opensearch.plugins.Plugin;
@@ -96,11 +102,22 @@
96102
* to interact with Cluster.
97103
*/
98104
@Log4j2
99-
public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin {
105+
public class GeospatialPlugin extends Plugin
106+
implements
107+
IngestPlugin,
108+
ActionPlugin,
109+
MapperPlugin,
110+
SearchPlugin,
111+
SystemIndexPlugin,
112+
ClusterPlugin {
100113
private Ip2GeoCachedDao ip2GeoCachedDao;
101114
private DatasourceDao datasourceDao;
102115
private GeoIpDataDao geoIpDataDao;
103116
private URLDenyListChecker urlDenyListChecker;
117+
private ClusterService clusterService;
118+
private Ip2GeoLockService ip2GeoLockService;
119+
private Ip2GeoExecutor ip2GeoExecutor;
120+
private DatasourceUpdateService datasourceUpdateService;
104121

105122
@Override
106123
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
@@ -129,7 +146,10 @@ public void onIndexModule(IndexModule indexModule) {
129146

130147
@Override
131148
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
132-
return List.of(Ip2GeoListener.class);
149+
final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(2);
150+
services.add(Ip2GeoListener.class);
151+
services.add(GuiceHolder.class);
152+
return services;
133153
}
134154

135155
@Override
@@ -158,20 +178,10 @@ public Collection<Object> createComponents(
158178
IndexNameExpressionResolver indexNameExpressionResolver,
159179
Supplier<RepositoriesService> repositoriesServiceSupplier
160180
) {
161-
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
162-
clusterService,
163-
datasourceDao,
164-
geoIpDataDao,
165-
urlDenyListChecker
166-
);
167-
Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
168-
Ip2GeoLockService ip2GeoLockService = new Ip2GeoLockService(clusterService, client);
169-
/**
170-
* We don't need to return datasource runner because it is used only by job scheduler and job scheduler
171-
* does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance.
172-
*/
173-
DatasourceRunner.getJobRunnerInstance()
174-
.initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceDao, ip2GeoLockService);
181+
this.clusterService = clusterService;
182+
this.datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceDao, geoIpDataDao, urlDenyListChecker);
183+
this.ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
184+
this.ip2GeoLockService = new Ip2GeoLockService(clusterService);
175185

176186
return List.of(
177187
UploadStats.getInstance(),
@@ -265,4 +275,48 @@ public List<AggregationSpec> getAggregations() {
265275

266276
return List.of(geoHexGridSpec);
267277
}
278+
279+
@Override
280+
public void onNodeStarted(DiscoveryNode localNode) {
281+
LockService lockService = GuiceHolder.getLockService();
282+
ip2GeoLockService.initialize(lockService);
283+
284+
DatasourceRunner.getJobRunnerInstance()
285+
.initialize(this.clusterService, this.datasourceUpdateService, this.ip2GeoExecutor, this.datasourceDao, this.ip2GeoLockService);
286+
}
287+
288+
public static class GuiceHolder implements LifecycleComponent {
289+
290+
private static LockService lockService;
291+
292+
@Inject
293+
public GuiceHolder(final LockService lockService) {
294+
GuiceHolder.lockService = lockService;
295+
}
296+
297+
static LockService getLockService() {
298+
return lockService;
299+
}
300+
301+
@Override
302+
public void close() {}
303+
304+
@Override
305+
public Lifecycle.State lifecycleState() {
306+
return null;
307+
}
308+
309+
@Override
310+
public void addLifecycleListener(LifecycleListener listener) {}
311+
312+
@Override
313+
public void removeLifecycleListener(LifecycleListener listener) {}
314+
315+
@Override
316+
public void start() {}
317+
318+
@Override
319+
public void stop() {}
320+
321+
}
268322
}

src/test/java/org/opensearch/geospatial/ClusterSettingHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.opensearch.common.network.NetworkModule;
2525
import org.opensearch.common.settings.Settings;
2626
import org.opensearch.env.Environment;
27-
import org.opensearch.geospatial.plugin.GeospatialPlugin;
2827
import org.opensearch.node.MockNode;
2928
import org.opensearch.node.Node;
3029
import org.opensearch.plugins.Plugin;
@@ -49,7 +48,7 @@ private List<Class<? extends Plugin>> basePlugins() {
4948
List<Class<? extends Plugin>> plugins = new ArrayList<>();
5049
plugins.add(getTestTransportPlugin());
5150
plugins.add(MockHttpTransport.TestPlugin.class);
52-
plugins.add(GeospatialPlugin.class);
51+
plugins.add(TestGeospatialPlugin.class);
5352
return plugins;
5453
}
5554

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial;
7+
8+
import java.util.ArrayList;
9+
import java.util.Collection;
10+
import java.util.List;
11+
12+
import org.opensearch.common.lifecycle.LifecycleComponent;
13+
import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener;
14+
import org.opensearch.geospatial.plugin.GeospatialPlugin;
15+
16+
/**
17+
* This class is needed for ClusterSettingsHelper.createMockNode to instantiate a test instance of the
18+
* GeospatialPlugin without the JobSchedulerPlugin installed. Without overriding this class, the
19+
* GeospatialPlugin would try to Inject JobScheduler's LockService in the GuiceHolder which will
20+
* fail because JobScheduler is not installed
21+
*/
22+
public class TestGeospatialPlugin extends GeospatialPlugin {
23+
@Override
24+
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
25+
final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1);
26+
services.add(Ip2GeoListener.class);
27+
return services;
28+
}
29+
}

src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,19 @@
2121
import org.opensearch.geospatial.GeospatialTestHelper;
2222
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
2323
import org.opensearch.jobscheduler.spi.LockModel;
24+
import org.opensearch.jobscheduler.spi.utils.LockService;
2425

2526
public class Ip2GeoLockServiceTests extends Ip2GeoTestCase {
2627
private Ip2GeoLockService ip2GeoLockService;
2728
private Ip2GeoLockService noOpsLockService;
2829

2930
@Before
3031
public void init() {
31-
ip2GeoLockService = new Ip2GeoLockService(clusterService, verifyingClient);
32-
noOpsLockService = new Ip2GeoLockService(clusterService, client);
32+
ip2GeoLockService = new Ip2GeoLockService(clusterService);
33+
noOpsLockService = new Ip2GeoLockService(clusterService);
34+
// TODO Remove direct instantiation and offer a TestLockService class to plugins
35+
ip2GeoLockService.initialize(new LockService(verifyingClient, clusterService));
36+
noOpsLockService.initialize(new LockService(client, clusterService));
3337
}
3438

3539
public void testAcquireLock_whenValidInput_thenSucceed() {

src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public void testCreateComponents() {
167167
}
168168

169169
public void testGetGuiceServiceClasses() {
170-
Collection<Class<? extends LifecycleComponent>> classes = List.of(Ip2GeoListener.class);
170+
Collection<Class<? extends LifecycleComponent>> classes = List.of(Ip2GeoListener.class, GeospatialPlugin.GuiceHolder.class);
171171
assertEquals(classes, plugin.getGuiceServiceClasses());
172172
}
173173

0 commit comments

Comments
 (0)