Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@
</properties>

<dependencies>
<dependency>
<groupId>co.elastic.logging</groupId>
<artifactId>logback-ecs-encoder</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.11.0</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.11.0</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
10 changes: 10 additions & 0 deletions src/main/environment/common_ci.properties
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,13 @@ spring.redis.host=@env.REDIS_HOST@
cors.allowed-origins=@env.CORS_ALLOWED_ORIGINS@

hipSystemUrl= @env.HIP_SYSTEM_URL@

# Elasticsearch Configuration
elasticsearch.host=@env.ELASTICSEARCH_HOST@
elasticsearch.port=@env.ELASTICSEARCH_PORT@
elasticsearch.username=@env.ELASTICSEARCH_USERNAME@
elasticsearch.password=@env.ELASTICSEARCH_PASSWORD@
elasticsearch.index.beneficiary=@env.ELASTICSEARCH_INDEX_BENEFICIARY@

# Enable/Disable ES (for gradual rollout)
elasticsearch.enabled=@env.ELASTICSEARCH_ENABLED@
11 changes: 11 additions & 0 deletions src/main/environment/common_docker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,14 @@ spring.redis.host=${REDIS_HOST}


hipSystemUrl= ${HIP_SYSTEM_URL}

# Elasticsearch Configuration
elasticsearch.host=${ELASTICSEARCH_HOST}
elasticsearch.port=${ELASTICSEARCH_PORT}
elasticsearch.username=${ELASTICSEARCH_USERNAME}
elasticsearch.password=${ELASTICSEARCH_PASSWORD}
elasticsearch.index.beneficiary=${ELASTICSEARCH_INDEX_BENEFICIARY}

# Enable/Disable ES (for gradual rollout)
elasticsearch.enabled=${ELASTICSEARCH_ENABLED}

13 changes: 12 additions & 1 deletion src/main/environment/common_example.properties
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,15 @@

cors.allowed-origins=http://localhost:*

hipSystemUrl= <Enter HIP request URL>
hipSystemUrl= <Enter HIP request URL>

# Elasticsearch Configuration
elasticsearch.host=localhost
elasticsearch.port=9200
elasticsearch.username=elastic
elasticsearch.password=piramalES

Check notice

Code scanning / SonarCloud

Credentials should not be hard-coded Low

Make sure these credentials get revoked, changed, and removed from the code. See more on SonarQube Cloud
elasticsearch.index.beneficiary=beneficiary_index

# Enable/Disable ES (for gradual rollout)
elasticsearch.enabled=true

2 changes: 2 additions & 0 deletions src/main/java/com/wipro/fhir/FhirApiApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableAsync;

import com.wipro.fhir.data.users.User;

@SpringBootApplication
@EnableAsync
public class FhirApiApplication {

public static void main(String[] args) {
Expand Down
104 changes: 104 additions & 0 deletions src/main/java/com/wipro/fhir/config/ElasticsearchConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.wipro.fhir.config;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class ElasticsearchConfig {

@Value("${elasticsearch.host}")
private String esHost;

@Value("${elasticsearch.port}")
private int esPort;

@Value("${elasticsearch.username}")
private String esUsername;

@Value("${elasticsearch.password}")
private String esPassword;

@Value("${elasticsearch.connection.timeout:10000}")
private int connectionTimeout;

@Value("${elasticsearch.socket.timeout:120000}")
private int socketTimeout;

@Value("${elasticsearch.max.connections:200}")
private int maxConnections;

@Value("${elasticsearch.max.connections.per.route:100}")
private int maxConnectionsPerRoute;

@Bean
public ElasticsearchClient elasticsearchClient() {
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(esUsername, esPassword)
);

RestClientBuilder builder = RestClient.builder(
new HttpHost(esHost, esPort, "http")
);

// Apply timeout configurations
builder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(connectionTimeout)
.setSocketTimeout(socketTimeout)
.setConnectionRequestTimeout(connectionTimeout)
);

// Apply connection pool settings
builder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
.setMaxConnTotal(maxConnections)
.setMaxConnPerRoute(maxConnectionsPerRoute)
.setDefaultIOReactorConfig(
IOReactorConfig.custom()
.setSoTimeout(socketTimeout)
.build()
)
);

RestClient restClient = builder.build();

ElasticsearchTransport transport = new RestClientTransport(
restClient,
new JacksonJsonpMapper()
);

return new ElasticsearchClient(transport);
}

@Bean(name = "esAsyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("es-sync-");
executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package com.wipro.fhir.service.elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.GetResponse;
import co.elastic.clients.elasticsearch.core.UpdateRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
* Lightweight ES sync service for FHIR-API
* Only updates ABHA-related fields without fetching full beneficiary data
*/
@Service
public class AbhaElasticsearchSyncService {

private static final Logger logger = LoggerFactory.getLogger(AbhaElasticsearchSyncService.class);

@Autowired
private ElasticsearchClient esClient;

@Value("${elasticsearch.index.beneficiary}")
private String beneficiaryIndex;

@Value("${elasticsearch.enabled}")
private boolean esEnabled;

private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Update ABHA details in Elasticsearch after ABHA is created/updated
* This method updates only ABHA fields, doesn't require full beneficiary data
*/
@Async("esAsyncExecutor")
public void updateAbhaInElasticsearch(Long benRegId, String healthId, String healthIdNumber, String createdDate) {
if (!esEnabled) {
logger.debug("Elasticsearch is disabled, skipping ABHA sync");
return;
}

if (benRegId == null) {
logger.warn("benRegId is null, cannot sync ABHA to ES");
return;
}

int maxRetries = 3;
int retryDelay = 2000; // 2 seconds

for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
logger.info("Syncing ABHA details to ES for benRegId: {} (attempt {}/{})", benRegId, attempt, maxRetries);

Map<String, Object> abhaData = new HashMap<>();
abhaData.put("healthID", healthId);
abhaData.put("abhaID", healthIdNumber);
abhaData.put("abhaCreatedDate", createdDate);

String documentId = String.valueOf(benRegId);
boolean exists = checkDocumentExists(documentId);

if (exists) {
UpdateRequest<Object, Object> updateRequest = UpdateRequest.of(u -> u
.index(beneficiaryIndex)
.id(documentId)
.doc(abhaData)
.refresh(Refresh.True)
.docAsUpsert(false)
.retryOnConflict(3)
);

esClient.update(updateRequest, Object.class);
logger.info("Successfully updated ABHA in ES: benRegId={}", benRegId);
return;

} else {
logger.warn("Document not found in ES for benRegId={} (attempt {}/{})", benRegId, attempt, maxRetries);
if (attempt < maxRetries) {
Thread.sleep(retryDelay * attempt);
}
}

} catch (java.net.SocketTimeoutException e) {
logger.error("Timeout updating ABHA in ES for benRegId {} (attempt {}/{}): {}",
benRegId, attempt, maxRetries, e.getMessage());
if (attempt < maxRetries) {
try {
Thread.sleep(retryDelay * attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
} catch (Exception e) {
logger.error("Error updating ABHA in ES for benRegId {} (attempt {}/{}): {}",
benRegId, attempt, maxRetries, e.getMessage());
if (attempt == maxRetries) {
logger.error("Failed to sync ABHA after {} attempts for benRegId {}", maxRetries, benRegId);
}
}
}
}

/**
* Check if document exists in ES
*/
private boolean checkDocumentExists(String documentId) {
try {
GetRequest getRequest = GetRequest.of(g -> g
.index(beneficiaryIndex)
.id(documentId)
);

GetResponse<Object> response = esClient.get(getRequest, Object.class);
return response.found();

} catch (Exception e) {
logger.debug("Document not found or error checking: {}", e.getMessage());
return false;
}
}

/**
* Retry sync after 5 seconds if document wasn't found
* (Handles race condition where ABHA is saved before beneficiary is synced to ES)
*/
@Async("esAsyncExecutor")
private void retryAfterDelay(Long benRegId, String healthId, String healthIdNumber, String createdDate) {
try {
Thread.sleep(5000); // Wait 5 seconds
logger.info("Retrying ABHA sync for benRegId: {}", benRegId);
updateAbhaInElasticsearch(benRegId, healthId, healthIdNumber, createdDate);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Retry interrupted for benRegId: {}", benRegId);
}
}

/**
* Update multiple ABHA addresses (comma-separated)
*/
@Async("esAsyncExecutor")
public void updateMultipleAbhaAddresses(Long benRegId, String commaSeparatedHealthIds,
String healthIdNumber, String createdDate) {
if (!esEnabled || benRegId == null) {
return;
}

// For multiple ABHA addresses, store as comma-separated string
updateAbhaInElasticsearch(benRegId, commaSeparatedHealthIds, healthIdNumber, createdDate);
}
}
Loading