Skip to content

Commit

Permalink
Merge pull request #177 from Olog/add-index-creation-timeout
Browse files Browse the repository at this point in the history
add configurability for index creation timeouts
  • Loading branch information
shroffk authored Oct 12, 2023
2 parents e44a54f + 58cbc4b commit 397ca55
Showing 1 changed file with 62 additions and 26 deletions.
88 changes: 62 additions & 26 deletions src/main/java/org/phoebus/olog/ElasticConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
Expand Down Expand Up @@ -46,7 +47,11 @@ public class ElasticConfig {

private static final Logger logger = Logger.getLogger(ElasticConfig.class.getName());

// Read the elatic index and type from the application.properties
// Read the elastic index and type from the application.properties
@Value("${elasticsearch.index.create.timeout:30s}")
private String ES_INDEX_CREATE_TIMEOUT;
@Value("${elasticsearch.index.create.master_timeout:30s}")
private String ES_INDEX_CREATE_MASTER_TIMEOUT;
@Value("${elasticsearch.tag.index:olog_tags}")
private String ES_TAG_INDEX;
@Value("${elasticsearch.logbook.index:olog_logbooks}")
Expand All @@ -60,7 +65,6 @@ public class ElasticConfig {
@Value("${elasticsearch.log.archive.index:olog_archived_logs}")
private String ES_LOG_ARCHIVE_INDEX;


@Value("${elasticsearch.cluster.name:elasticsearch}")
private String clusterName;
@Value("${elasticsearch.network.host:localhost}")
Expand All @@ -82,6 +86,28 @@ public class ElasticConfig {
private ElasticsearchClient client;
private static final AtomicBoolean esInitialized = new AtomicBoolean();

private CreateIndexRequest.Builder withTimeouts(CreateIndexRequest.Builder builder) {
return builder
.timeout(timeBuilder ->
timeBuilder.time(ES_INDEX_CREATE_TIMEOUT)
).masterTimeout( timeBuilder ->
timeBuilder.time(ES_INDEX_CREATE_MASTER_TIMEOUT)
);
}

private void logCreateIndexRequest(CreateIndexRequest request) {
logger.log(Level.INFO, String.format(
"CreateIndexRequest: " +
"index: %s, " +
"timeout: %s, " +
"masterTimeout: %s, " +
"waitForActiveShards: %s",
request.index(),
request.timeout() != null ? request.timeout().time() : null,
request.masterTimeout() != null ? request.masterTimeout().time() : null,
request.waitForActiveShards() != null ? request.waitForActiveShards()._toJsonString() : null
));
}
@Bean({"client"})
public ElasticsearchClient getClient() {
if (client == null) {
Expand Down Expand Up @@ -111,12 +137,16 @@ void elasticIndexValidation(ElasticsearchClient client) {

// Olog Sequence Index
try (InputStream is = ElasticConfig.class.getResourceAsStream("/seq_mapping.json")) {
BooleanResponse exits = client.indices().exists(ExistsRequest.of(e -> e.index(ES_SEQ_INDEX)));
if(!exits.value()) {

BooleanResponse exists = client.indices().exists(ExistsRequest.of(e -> e.index(ES_SEQ_INDEX)));
if(!exists.value()) {
CreateIndexRequest request = CreateIndexRequest.of(
c -> withTimeouts(c).index(ES_SEQ_INDEX)
.withJson(is)
);
logCreateIndexRequest(request);
CreateIndexResponse result = client.indices().create(
CreateIndexRequest.of(
c -> c.index(ES_SEQ_INDEX).withJson(is)));
request
);
logger.info("Created index: " + ES_SEQ_INDEX + " : acknowledged " + result.acknowledged());
}
} catch (IOException e) {
Expand All @@ -127,10 +157,11 @@ void elasticIndexValidation(ElasticsearchClient client) {
try (InputStream is = ElasticConfig.class.getResourceAsStream("/logbook_mapping.json")) {
BooleanResponse exits = client.indices().exists(ExistsRequest.of(e -> e.index(ES_LOGBOOK_INDEX)));
if(!exits.value()) {

CreateIndexResponse result = client.indices().create(
CreateIndexRequest.of(
c -> c.index(ES_LOGBOOK_INDEX).withJson(is)));
CreateIndexRequest request = CreateIndexRequest.of(
c -> withTimeouts(c).index(ES_LOGBOOK_INDEX).withJson(is)
);
logCreateIndexRequest(request);
CreateIndexResponse result = client.indices().create(request);
logger.info("Created index: " + ES_LOGBOOK_INDEX + " : acknowledged " + result.acknowledged());
}
} catch (IOException e) {
Expand All @@ -141,10 +172,11 @@ void elasticIndexValidation(ElasticsearchClient client) {
try (InputStream is = ElasticConfig.class.getResourceAsStream("/tag_mapping.json")) {
BooleanResponse exits = client.indices().exists(ExistsRequest.of(e -> e.index(ES_TAG_INDEX)));
if(!exits.value()) {

CreateIndexResponse result = client.indices().create(
CreateIndexRequest.of(
c -> c.index(ES_TAG_INDEX).withJson(is)));
CreateIndexRequest request = CreateIndexRequest.of(
c -> withTimeouts(c).index(ES_TAG_INDEX).withJson(is)
);
logCreateIndexRequest(request);
CreateIndexResponse result = client.indices().create(request);
logger.info("Created index: " + ES_TAG_INDEX + " : acknowledged " + result.acknowledged());
}
} catch (IOException e) {
Expand All @@ -155,10 +187,11 @@ void elasticIndexValidation(ElasticsearchClient client) {
try (InputStream is = ElasticConfig.class.getResourceAsStream("/property_mapping.json")) {
BooleanResponse exits = client.indices().exists(ExistsRequest.of(e -> e.index(ES_PROPERTY_INDEX)));
if(!exits.value()) {

CreateIndexResponse result = client.indices().create(
CreateIndexRequest.of(
c -> c.index(ES_PROPERTY_INDEX).withJson(is)));
CreateIndexRequest request = CreateIndexRequest.of(
c -> withTimeouts(c).index(ES_PROPERTY_INDEX).withJson(is)
);
logCreateIndexRequest(request);
CreateIndexResponse result = client.indices().create(request);
logger.info("Created index: " + ES_PROPERTY_INDEX + " : acknowledged " + result.acknowledged());
}
} catch (IOException e) {
Expand All @@ -169,10 +202,11 @@ void elasticIndexValidation(ElasticsearchClient client) {
try (InputStream is = ElasticConfig.class.getResourceAsStream("/log_entry_mapping.json")) {
BooleanResponse exits = client.indices().exists(ExistsRequest.of(e -> e.index(ES_LOG_INDEX)));
if(!exits.value()) {

CreateIndexResponse result = client.indices().create(
CreateIndexRequest.of(
c -> c.index(ES_LOG_INDEX).withJson(is)));
CreateIndexRequest request = CreateIndexRequest.of(
c -> withTimeouts(c).index(ES_LOG_INDEX).withJson(is)
);
logCreateIndexRequest(request);
CreateIndexResponse result = client.indices().create(request);
logger.info("Created index: " + ES_LOG_INDEX + " : acknowledged " + result.acknowledged());
}
} catch (IOException e) {
Expand All @@ -182,9 +216,11 @@ void elasticIndexValidation(ElasticsearchClient client) {
try (InputStream is = ElasticConfig.class.getResourceAsStream("/log_entry_mapping.json")) {
BooleanResponse exits = client.indices().exists(ExistsRequest.of(e -> e.index(ES_LOG_ARCHIVE_INDEX)));
if (!exits.value()) {
CreateIndexResponse result = client.indices().create(
CreateIndexRequest.of(
c -> c.index(ES_LOG_ARCHIVE_INDEX).withJson(is)));
CreateIndexRequest request = CreateIndexRequest.of(
c -> withTimeouts(c).index(ES_LOG_ARCHIVE_INDEX).withJson(is)
);
logCreateIndexRequest(request);
CreateIndexResponse result = client.indices().create(request);
logger.info("Created index: " + "archived_" + ES_LOG_ARCHIVE_INDEX + " : acknowledged " + result.acknowledged());
}
} catch (IOException e) {
Expand Down

0 comments on commit 397ca55

Please sign in to comment.