diff --git a/pom.xml b/pom.xml index 27341f1..57f7464 100644 --- a/pom.xml +++ b/pom.xml @@ -76,16 +76,6 @@ org.springframework.boot spring-boot-starter-data-elasticsearch - - co.elastic.clients - elasticsearch-java - 8.11.0 - - - - org.springframework.boot - spring-boot-starter-data-elasticsearch - co.elastic.clients elasticsearch-java diff --git a/src/main/java/com/iemr/common/identity/config/ElasticsearchSyncConfig.java b/src/main/java/com/iemr/common/identity/config/ElasticsearchSyncConfig.java index bb72f91..ce6f5da 100644 --- a/src/main/java/com/iemr/common/identity/config/ElasticsearchSyncConfig.java +++ b/src/main/java/com/iemr/common/identity/config/ElasticsearchSyncConfig.java @@ -25,11 +25,13 @@ public Executor elasticsearchSyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Only 1-2 sync jobs should run at a time to avoid overwhelming DB/ES - executor.setCorePoolSize(5); - executor.setMaxPoolSize(10); + executor.setCorePoolSize(2); + executor.setMaxPoolSize(4); executor.setQueueCapacity(100); executor.setThreadNamePrefix("es-sync-"); executor.setKeepAliveSeconds(60); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); // Handle rejected tasks executor.setRejectedExecutionHandler((r, executor1) -> { diff --git a/src/main/java/com/iemr/common/identity/controller/elasticsearch/ElasticsearchSyncController.java b/src/main/java/com/iemr/common/identity/controller/elasticsearch/ElasticsearchSyncController.java index b8ed204..8c00730 100644 --- a/src/main/java/com/iemr/common/identity/controller/elasticsearch/ElasticsearchSyncController.java +++ b/src/main/java/com/iemr/common/identity/controller/elasticsearch/ElasticsearchSyncController.java @@ -45,7 +45,7 @@ public class ElasticsearchSyncController { * Start async full sync (RECOMMENDED for millions of records) * Returns immediately with job ID for tracking * - * Usage: POST http://localhost:8080/elasticsearch/sync/start + * Usage: POST http://localhost:8080/elasticsearch/start */ @PostMapping("/start") public ResponseEntity> startAsyncFullSync( @@ -83,7 +83,7 @@ public ResponseEntity> startAsyncFullSync( /** * Get job status by ID * - * Usage: GET http://localhost:8080/elasticsearch/sync/status/1 + * Usage: GET http://localhost:8080/elasticsearch/status/1 */ @GetMapping("/status/{jobId}") public ResponseEntity> getAsyncJobStatus(@PathVariable Long jobId) { @@ -120,7 +120,7 @@ public ResponseEntity> getAsyncJobStatus(@PathVariable Long /** * Get all active jobs * - * Usage: GET http://localhost:8080/elasticsearch/sync/active + * Usage: GET http://localhost:8080/elasticsearch/active */ @GetMapping("/active") public ResponseEntity> getActiveJobs() { @@ -131,7 +131,7 @@ public ResponseEntity> getActiveJobs() { /** * Get recent jobs * - * Usage: GET http://localhost:8080/elasticsearch/sync/recent + * Usage: GET http://localhost:8080/elasticsearch/recent */ @GetMapping("/recent") public ResponseEntity> getRecentJobs() { @@ -142,7 +142,7 @@ public ResponseEntity> getRecentJobs() { /** * Resume a failed job * - * Usage: POST http://localhost:8080/elasticsearch/sync/resume/1 + * Usage: POST http://localhost:8080/elasticsearch/resume/1 */ @PostMapping("/resume/{jobId}") public ResponseEntity> resumeJob( @@ -173,7 +173,7 @@ public ResponseEntity> resumeJob( /** * Cancel a running job * - * Usage: POST http://localhost:8080/elasticsearch/sync/cancel/1 + * Usage: POST http://localhost:8080/elasticsearch/cancel/1 */ @PostMapping("/cancel/{jobId}") public ResponseEntity> cancelJob(@PathVariable Long jobId) { @@ -194,10 +194,10 @@ public ResponseEntity> cancelJob(@PathVariable Long jobId) { } /** - * LEGACY: Synchronous full sync (NOT recommended for large datasets) + * LEGACY: Synchronous full sync(NOT recommended for large datasets) * Use /start instead * - * Usage: POST http://localhost:8080/elasticsearch/sync/all + * Usage: POST http://localhost:8080/elasticsearch/all */ @PostMapping("/all") public ResponseEntity> syncAllBeneficiaries() { @@ -232,7 +232,7 @@ public ResponseEntity> syncAllBeneficiaries() { /** * Sync a single beneficiary by BenRegId * - * Usage: POST http://localhost:8080/elasticsearch/sync/single/123456 + * Usage: POST http://localhost:8080/elasticsearch/single/123456 */ @PostMapping("/single/{benRegId}") public ResponseEntity> syncSingleBeneficiary( @@ -268,7 +268,7 @@ public ResponseEntity> syncSingleBeneficiary( /** * Check sync status - compare DB count vs ES count * - * Usage: GET http://localhost:8080/elasticsearch/sync/status + * Usage: GET http://localhost:8080/elasticsearch/status */ @GetMapping("/status") public ResponseEntity checkSyncStatus() { @@ -289,7 +289,7 @@ public ResponseEntity checkSyncStatus() { /** * Health check endpoint * - * Usage: GET http://localhost:8080/elasticsearch/sync/health + * Usage: GET http://localhost:8080/elasticsearch/health */ @GetMapping("/health") public ResponseEntity> healthCheck() { @@ -304,7 +304,7 @@ public ResponseEntity> healthCheck() { /** * Debug endpoint to check if a beneficiary exists in database * - * Usage: GET http://localhost:8080/elasticsearch/sync/debug/check/123456 + * Usage: GET http://localhost:8080/elasticsearch/debug/check/123456 */ @GetMapping("/debug/check/{benRegId}") public ResponseEntity> checkBeneficiaryExists( diff --git a/src/main/java/com/iemr/common/identity/data/elasticsearch/BeneficiaryDocument.java b/src/main/java/com/iemr/common/identity/data/elasticsearch/BeneficiaryDocument.java index d8b3b01..aad9011 100644 --- a/src/main/java/com/iemr/common/identity/data/elasticsearch/BeneficiaryDocument.java +++ b/src/main/java/com/iemr/common/identity/data/elasticsearch/BeneficiaryDocument.java @@ -58,6 +58,9 @@ public class BeneficiaryDocument { @JsonProperty("abhaID") private String abhaID; + @JsonProperty("abhaCreatedDate") + private String abhaCreatedDate; + @JsonProperty("familyID") private String familyID; diff --git a/src/main/java/com/iemr/common/identity/dto/BeneficiariesESDTO.java b/src/main/java/com/iemr/common/identity/dto/BeneficiariesESDTO.java index 969f8af..6238e64 100644 --- a/src/main/java/com/iemr/common/identity/dto/BeneficiariesESDTO.java +++ b/src/main/java/com/iemr/common/identity/dto/BeneficiariesESDTO.java @@ -77,6 +77,9 @@ public class BeneficiariesESDTO { @JsonProperty("abhaID") private String abhaID; + @JsonProperty("abhaCreatedDate") + private String abhaCreatedDate; + @JsonProperty("familyID") private String familyID; diff --git a/src/main/java/com/iemr/common/identity/repo/BenMappingRepo.java b/src/main/java/com/iemr/common/identity/repo/BenMappingRepo.java index 33db1d9..9c20f49 100644 --- a/src/main/java/com/iemr/common/identity/repo/BenMappingRepo.java +++ b/src/main/java/com/iemr/common/identity/repo/BenMappingRepo.java @@ -180,7 +180,8 @@ MBeneficiarymapping getWithVanSerialNoVanID(@Param("vanSerialNo") BigInteger van "UNIX_TIMESTAMP(m.LastModDate) * 1000, " + // 13 "m.BenAccountID, " + // 14 "contact.PreferredPhoneNum, " + // 15 - "h.HealthID, " + "h.HealthIDNumber, " + "fam.BenFamilyMapId, " + + // "h.HealthID, " + "h.HealthIDNumber, " + + "fam.BenFamilyMapId, " + "addr.CurrStateId, " + // 19 "addr.CurrState, " + // 20 "addr.CurrDistrictId, " + // 21 diff --git a/src/main/java/com/iemr/common/identity/repo/V_BenAdvanceSearchRepo.java b/src/main/java/com/iemr/common/identity/repo/V_BenAdvanceSearchRepo.java index 0be61bb..a7fd5f2 100644 --- a/src/main/java/com/iemr/common/identity/repo/V_BenAdvanceSearchRepo.java +++ b/src/main/java/com/iemr/common/identity/repo/V_BenAdvanceSearchRepo.java @@ -46,4 +46,9 @@ public interface V_BenAdvanceSearchRepo extends CrudRepository getBenRegIDByHealthIDNoAbhaIdNo(@Param("healthIDNo") String healthIDNo); + //Batch fetch ABHA details for multiple beneficiaries + @Query(nativeQuery = true, value = "SELECT BeneficiaryRegID, HealthID, HealthIDNumber, AuthenticationMode, CreatedDate" + + " FROM db_iemr.m_benhealthidmapping WHERE BeneficiaryRegID IN :benRegIDs ORDER BY BeneficiaryRegID, CreatedDate DESC") + List getBenAbhaDetailsByBenRegIDs(@Param("benRegIDs") List benRegIDs); + } diff --git a/src/main/java/com/iemr/common/identity/service/IdentityService.java b/src/main/java/com/iemr/common/identity/service/IdentityService.java index d107ae1..6db2101 100644 --- a/src/main/java/com/iemr/common/identity/service/IdentityService.java +++ b/src/main/java/com/iemr/common/identity/service/IdentityService.java @@ -95,7 +95,7 @@ import com.iemr.common.identity.service.elasticsearch.ElasticsearchService; import com.iemr.common.identity.utils.mapper.OutputMapper; import com.iemr.common.identity.utils.response.OutputResponse; - +import org.springframework.scheduling.annotation.Async; import org.springframework.beans.factory.annotation.Value; import com.iemr.common.identity.service.elasticsearch.BeneficiaryElasticsearchIndexUpdater; @@ -998,6 +998,9 @@ public void editIdentity(IdentityEditDTO identity) throws MissingMandatoryFields MBeneficiarymapping benMapping = mappingRepo.findByBenRegIdOrderByBenMapIdAsc(identity.getBeneficiaryRegId()); + // Track if ANY change happened (determines if ES sync is needed) + boolean isDataUpdate = false; + // change in self details is implement here and other details here logger.debug("identity.getChangeInSelfDetails = " + identity.getChangeInSelfDetails()); logger.debug("identity.getChangeInOtherDetails = " + identity.getChangeInOtherDetails()); @@ -1066,6 +1069,7 @@ public void editIdentity(IdentityEditDTO identity) throws MissingMandatoryFields mbDetl.setEmergencyRegistration(false); } detailRepo.save(mbDetl); + isDataUpdate = true; } } // edition in current emergency and permanent is implement below @@ -1092,6 +1096,7 @@ public void editIdentity(IdentityEditDTO identity) throws MissingMandatoryFields */ logger.debug("Beneficiary address to update = " + OutputMapper.gson().toJson(mbAddr)); addressRepo.save(mbAddr); + isDataUpdate = true; } // edition in beneficiary contacts is updated here @@ -1118,6 +1123,8 @@ public void editIdentity(IdentityEditDTO identity) throws MissingMandatoryFields */ logger.debug("Beneficiary contact to update = " + OutputMapper.gson().toJson(benCon)); contactRepo.save(benCon); + isDataUpdate = true; + } // change in identities are added here @@ -1160,6 +1167,8 @@ public void editIdentity(IdentityEditDTO identity) throws MissingMandatoryFields } index++; + isDataUpdate = true; + } } @@ -1204,6 +1213,8 @@ public void editIdentity(IdentityEditDTO identity) throws MissingMandatoryFields } index++; + isDataUpdate = true; + } } @@ -1230,6 +1241,8 @@ public void editIdentity(IdentityEditDTO identity) throws MissingMandatoryFields */ logger.debug("Account to upsert = " + OutputMapper.gson().toJson(beneficiaryAccount)); accountRepo.save(beneficiaryAccount); + isDataUpdate = true; + } if (Boolean.TRUE.equals(identity.getChangeInBenImage())) { @@ -1254,15 +1267,20 @@ public void editIdentity(IdentityEditDTO identity) throws MissingMandatoryFields logger.debug("Image to upsert = " + OutputMapper.gson().toJson(beneficiaryImage)); beneficiaryImage.setProcessed("N"); imageRepo.save(beneficiaryImage); + isDataUpdate = true; + } // Trigger async sync to Elasticsearch - if (identity.getBeneficiaryRegId() != null) { + if (isDataUpdate && identity.getBeneficiaryRegId() != null) { logger.info("Triggering Elasticsearch sync for benRegId: {}", identity.getBeneficiaryRegId()); syncService.syncBeneficiaryAsync(identity.getBeneficiaryRegId()); } - logger.info("IdentityService.editIdentity - end. id = " + benMapping.getBenMapId()); - } + + logger.info("IdentityService.editIdentity - end. id = " + benMapping.getBenMapId()); +} + + private MBeneficiarydetail convertIdentityEditDTOToMBeneficiarydetail(IdentityEditDTO dto) { MBeneficiarydetail beneficiarydetail = new MBeneficiarydetail(); @@ -2142,5 +2160,7 @@ public Long checkBenIDAvailabilityLocal() { return regIdRepo.countByProvisioned(false); } + + } diff --git a/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryDataService.java b/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryDataService.java deleted file mode 100644 index 226ff7a..0000000 --- a/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryDataService.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.iemr.common.identity.service.elasticsearch; - -import java.math.BigInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import com.iemr.common.identity.domain.MBeneficiarymapping; -import com.iemr.common.identity.dto.BenDetailDTO; -import com.iemr.common.identity.dto.BeneficiariesDTO; -import com.iemr.common.identity.repo.BenMappingRepo; - -/** - * Service to fetch beneficiary data directly from database - * Used for Elasticsearch sync to avoid circular dependencies - */ -@Service -public class BeneficiaryDataService { - - private static final Logger logger = LoggerFactory.getLogger(BeneficiaryDataService.class); - - @Autowired - private BenMappingRepo mappingRepo; - - /** - * Fetch beneficiary data directly from database by benRegId - * This bypasses any Elasticsearch caching to get fresh database data - * - * @param benRegId - * @return - */ - public BeneficiariesDTO getBeneficiaryFromDatabase(BigInteger benRegId) { - int maxRetries = 3; - int retryCount = 0; - - while (retryCount < maxRetries) { - try { - logger.debug("Fetching beneficiary from database: benRegId={}, attempt={}", benRegId, retryCount + 1); - - MBeneficiarymapping mapping = mappingRepo.findByBenRegIdWithDetails(benRegId); - - if (mapping == null) { - logger.warn("Beneficiary mapping not found: benRegId={}", benRegId); - return null; - } - - BeneficiariesDTO dto = convertToDTO(mapping); - - logger.debug("Successfully fetched beneficiary: benRegId={}", benRegId); - return dto; - - } catch (org.springframework.orm.jpa.JpaSystemException e) { - retryCount++; - logger.warn("Database connection error for benRegId={}, attempt {}/{}: {}", - benRegId, retryCount, maxRetries, e.getMessage()); - - if (retryCount >= maxRetries) { - logger.error("Max retries reached for benRegId={}", benRegId); - return null; - } - - try { - Thread.sleep(1000 * retryCount); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - return null; - } - - } catch (Exception e) { - logger.error("Error fetching beneficiary from database: benRegId={}, error={}", - benRegId, e.getMessage(), e); - return null; - } - } - - return null; - } - - /** - * Convert MBeneficiarymapping entity to BeneficiariesDTO - */ - private BeneficiariesDTO convertToDTO(MBeneficiarymapping mapping) { - BeneficiariesDTO dto = new BeneficiariesDTO(); - - try { - dto.setBenRegId(mapping.getBenRegId()); - dto.setBenMapId(mapping.getBenMapId()); - - if (mapping.getMBeneficiaryregidmapping() != null) { - dto.setBenId(mapping.getMBeneficiaryregidmapping().getBeneficiaryID()); - } - - if (mapping.getMBeneficiarycontact() != null) { - dto.setPreferredPhoneNum(mapping.getMBeneficiarycontact().getPreferredPhoneNum()); - } - - if (mapping.getMBeneficiarydetail() != null) { - BenDetailDTO detailDTO = new BenDetailDTO(); - - detailDTO.setFirstName(mapping.getMBeneficiarydetail().getFirstName()); - detailDTO.setLastName(mapping.getMBeneficiarydetail().getLastName()); - - if (mapping.getMBeneficiarydetail().getDob() != null) { - detailDTO.setBeneficiaryAge(calculateAge(mapping.getMBeneficiarydetail().getDob())); - } - - dto.setBeneficiaryDetails(detailDTO); - } - - logger.debug("Successfully converted mapping to DTO: benRegId={}", mapping.getBenRegId()); - - } catch (Exception e) { - logger.error("Error converting mapping to DTO: {}", e.getMessage(), e); - } - - return dto; - } - - /** - * Calculate age from date of birth - */ - private Integer calculateAge(java.sql.Timestamp dob) { - try { - if (dob == null) { - return null; - } - - java.time.LocalDate birthDate = dob.toLocalDateTime().toLocalDate(); - java.time.LocalDate now = java.time.LocalDate.now(); - - return java.time.Period.between(birthDate, now).getYears(); - - } catch (Exception e) { - logger.error("Error calculating age: {}", e.getMessage()); - return null; - } - } -} \ No newline at end of file diff --git a/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryDocumentDataService.java b/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryDocumentDataService.java index e6aa965..cdca328 100644 --- a/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryDocumentDataService.java +++ b/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryDocumentDataService.java @@ -2,16 +2,20 @@ import java.math.BigInteger; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import com.iemr.common.identity.data.elasticsearch.BeneficiaryDocument; import com.iemr.common.identity.repo.BenMappingRepo; +import com.iemr.common.identity.repo.V_BenAdvanceSearchRepo; /** * Optimized service to fetch complete beneficiary data in bulk @@ -25,6 +29,9 @@ public class BeneficiaryDocumentDataService { @Autowired private BenMappingRepo mappingRepo; + @Autowired + private V_BenAdvanceSearchRepo v_BenAdvanceSearchRepo; + /** * Fetch multiple beneficiaries with COMPLETE data in ONE query * This is the KEY method that replaces multiple individual queries @@ -37,10 +44,14 @@ public List getBeneficiariesBatch(List benRegId try { logger.debug("Fetching {} beneficiaries with complete data", benRegIds.size()); - + List results = mappingRepo.findCompleteDataByBenRegIds(benRegIds); - + logger.info("Fetched {} complete beneficiary records", results.size()); + // Batch fetch ABHA details for ALL beneficiaries at once + Map abhaMap = batchFetchAbhaData(benRegIds); + + logger.info("Fetched ABHA details for {} beneficiaries", abhaMap.size()); List documents = new ArrayList<>(); @@ -48,6 +59,17 @@ public List getBeneficiariesBatch(List benRegId try { BeneficiaryDocument doc = mapRowToDocument(row); if (doc != null && doc.getBenId() != null) { + + AbhaData abhaData = abhaMap.get(doc.getBenRegId()); + if (abhaData != null) { + doc.setHealthID(abhaData.getHealthID()); + doc.setAbhaID(abhaData.getHealthIDNumber()); + doc.setAbhaCreatedDate(abhaData.getAbhaCreatedDate()); + logger.info("Enriched benRegId={} with healthID={}, abhaID={}", + doc.getBenRegId(), doc.getHealthID(), doc.getAbhaID()); + } else { + logger.debug("No ABHA details for benRegId={}", doc.getBenRegId()); + } documents.add(doc); } } catch (Exception e) { @@ -64,6 +86,86 @@ public List getBeneficiariesBatch(List benRegId } } + private Map batchFetchAbhaData(List benRegIds) { + try { + return batchFetchAbhaDetails(benRegIds); + } catch (Exception e) { + logger.warn("Error fetching ABHA details (will continue without ABHA data): {}", e.getMessage()); + return new HashMap<>(); // Return empty map to continue processing + } + } + + /** + * Batch fetch ABHA details for multiple beneficiaries + * Returns a map of benRegId -> AbhaData + */ + @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true, timeout = 30) + private Map batchFetchAbhaDetails(List benRegIds) { + Map abhaMap = new HashMap<>(); + + try { + if (benRegIds == null || benRegIds.isEmpty()) { + logger.info("No beneficiary IDs provided for ABHA fetch"); + return abhaMap; + } + logger.debug("Batch fetching ABHA details for {} beneficiaries", benRegIds.size()); + + List abhaRecords = null; + try { + abhaRecords = v_BenAdvanceSearchRepo.getBenAbhaDetailsByBenRegIDs(benRegIds); + } catch (Exception e) { + logger.warn("ABHA query returned error (likely no records): {}", e.getMessage()); + return abhaMap; // Return empty map - this is OK + } + + if (abhaRecords == null || abhaRecords.isEmpty()) { + logger.debug("No ABHA records found for this batch (this is normal)"); + return abhaMap; + } + + logger.debug("Retrieved {} ABHA records", abhaRecords.size()); + + for (Object[] record : abhaRecords) { + try { + // record[0] -> BeneficiaryRegID + // record[1] -> HealthID (ABHA Address) + // record[2] -> HealthIDNumber (ABHA Number) + // record[3] -> AuthenticationMode + // record[4] -> CreatedDate + + Long benRegId = null; + if (record[0] instanceof BigInteger) { + benRegId = ((BigInteger) record[0]).longValue(); + } else if (record[0] instanceof Long) { + benRegId = (Long) record[0]; + } else if (record[0] instanceof Integer) { + benRegId = ((Integer) record[0]).longValue(); + } + + if (benRegId != null && !abhaMap.containsKey(benRegId)) { + // Only store the first (most recent) record for each beneficiary + AbhaData abhaData = new AbhaData(); + abhaData.setHealthID(record[1] != null ? record[1].toString() : null); + abhaData.setHealthIDNumber(record[2] != null ? record[2].toString() : null); + abhaData.setAuthenticationMode(record[3] != null ? record[3].toString() : null); + abhaData.setAbhaCreatedDate(record[4] != null ? record[4].toString() : null); + + abhaMap.put(benRegId, abhaData); + } + } catch (Exception e) { + logger.error("Error processing ABHA record: {}", e.getMessage()); + } + } + + logger.debug("Processed {} unique ABHA records into map", abhaMap.size()); + + } catch (Exception e) { + logger.error("Error batch fetching ABHA details: {}", e.getMessage(), e); + } + + return abhaMap; + } + /** * Single beneficiary fetch (for backward compatibility) */ @@ -71,52 +173,83 @@ public List getBeneficiariesBatch(List benRegId public BeneficiaryDocument getBeneficiaryFromDatabase(BigInteger benRegId) { List ids = new ArrayList<>(); ids.add(benRegId); - + List results = getBeneficiariesBatch(ids); return results.isEmpty() ? null : results.get(0); } - + + /** + * Fetch single beneficiary WITH fresh ABHA data + * Use this for real-time sync (create/update operations) + */ + @Transactional(readOnly = true, timeout = 10) + public BeneficiaryDocument getBeneficiaryWithAbhaDetails(BigInteger benRegId) { + if (benRegId == null) { + return null; + } + // Fetch beneficiary + ABHA in one call + List ids = List.of(benRegId); + List results = getBeneficiariesBatch(ids); + + if (results.isEmpty()) { + logger.warn("No beneficiary found for benRegId={}", benRegId); + return null; + } + + BeneficiaryDocument doc = results.get(0); + + // Log for debugging + if (doc.getHealthID() != null || doc.getAbhaID() != null) { + logger.info("Beneficiary has ABHA: benRegId={}, healthID={}, abhaID={}", + benRegId, doc.getHealthID(), doc.getAbhaID()); + } else { + logger.debug("No ABHA details for benRegId={}", benRegId); + } + + return doc; + } + /** * Map database row to BeneficiaryDocument for ES * Matches the query column order from BenMappingRepo */ private BeneficiaryDocument mapRowToDocument(Object[] row) { BeneficiaryDocument doc = new BeneficiaryDocument(); - + try { int idx = 0; - + // Basic IDs (0-1) Long benRegId = getLong(row[idx++]); doc.setBenRegId(benRegId); - String beneficiaryID = getString(row[idx++]); - if (beneficiaryID != null && !beneficiaryID.isEmpty()) { + String beneficiaryID = getString(row[idx++]); + if (beneficiaryID != null && !beneficiaryID.isEmpty()) { doc.setBenId(beneficiaryID); - } + } doc.setBeneficiaryID(beneficiaryID); - + doc.setFirstName(getString(row[idx++])); doc.setLastName(getString(row[idx++])); doc.setGenderID(getInteger(row[idx++])); doc.setGenderName(getString(row[idx++])); - doc.setGender(doc.getGenderName()); + doc.setGender(doc.getGenderName()); doc.setDOB(getDate(row[idx++])); doc.setAge(getInteger(row[idx++])); doc.setFatherName(getString(row[idx++])); doc.setSpouseName(getString(row[idx++])); doc.setIsHIVPos(getString(row[idx++])); - + doc.setCreatedBy(getString(row[idx++])); doc.setCreatedDate(getDate(row[idx++])); doc.setLastModDate(getLong(row[idx++])); doc.setBenAccountID(getLong(row[idx++])); - + doc.setPhoneNum(getString(row[idx++])); - - doc.setHealthID(getString(row[idx++])); - doc.setAbhaID(getString(row[idx++])); + + // doc.setHealthID(getString(row[idx++])); + // doc.setAbhaID(getString(row[idx++])); doc.setFamilyID(getString(row[idx++])); - + doc.setStateID(getInteger(row[idx++])); doc.setStateName(getString(row[idx++])); doc.setDistrictID(getInteger(row[idx++])); @@ -129,7 +262,7 @@ private BeneficiaryDocument mapRowToDocument(Object[] row) { doc.setServicePointID(getInteger(row[idx++])); doc.setServicePointName(getString(row[idx++])); doc.setParkingPlaceID(getInteger(row[idx++])); - + doc.setPermStateID(getInteger(row[idx++])); doc.setPermStateName(getString(row[idx++])); doc.setPermDistrictID(getInteger(row[idx++])); @@ -138,54 +271,107 @@ private BeneficiaryDocument mapRowToDocument(Object[] row) { doc.setPermBlockName(getString(row[idx++])); doc.setPermVillageID(getInteger(row[idx++])); doc.setPermVillageName(getString(row[idx++])); - + // doc.setGovtIdentityNo(getString(row[idx++])); // String aadhar = getString(row[idx]); // doc.setAadharNo(aadhar != null ? aadhar : doc.getGovtIdentityNo()); - + } catch (Exception e) { logger.error("Error mapping row to document: {}", e.getMessage(), e); } - + return doc; } - + // Helper methods private String getString(Object value) { return value != null ? value.toString() : null; } - + private Long getLong(Object value) { - if (value == null) return null; - if (value instanceof Long) return (Long) value; - if (value instanceof Integer) return ((Integer) value).longValue(); - if (value instanceof BigInteger) return ((BigInteger) value).longValue(); + if (value == null) + return null; + if (value instanceof Long) + return (Long) value; + if (value instanceof Integer) + return ((Integer) value).longValue(); + if (value instanceof BigInteger) + return ((BigInteger) value).longValue(); try { return Long.parseLong(value.toString()); } catch (NumberFormatException e) { return null; } } - + private Integer getInteger(Object value) { - if (value == null) return null; - if (value instanceof Integer) return (Integer) value; - if (value instanceof Long) return ((Long) value).intValue(); - if (value instanceof BigInteger) return ((BigInteger) value).intValue(); + if (value == null) + return null; + if (value instanceof Integer) + return (Integer) value; + if (value instanceof Long) + return ((Long) value).intValue(); + if (value instanceof BigInteger) + return ((BigInteger) value).intValue(); try { return Integer.parseInt(value.toString()); } catch (NumberFormatException e) { return null; } } - + private java.util.Date getDate(Object value) { - if (value == null) return null; - if (value instanceof java.util.Date) return (java.util.Date) value; - if (value instanceof java.sql.Timestamp) + if (value == null) + return null; + if (value instanceof java.util.Date) + return (java.util.Date) value; + if (value instanceof java.sql.Timestamp) return new java.util.Date(((java.sql.Timestamp) value).getTime()); - if (value instanceof java.sql.Date) + if (value instanceof java.sql.Date) return new java.util.Date(((java.sql.Date) value).getTime()); return null; } + + /** + * Inner class to hold ABHA data + */ + private static class AbhaData { + private String healthID; + private String healthIDNumber; + private String authenticationMode; + private String abhaCreatedDate; + + public String getHealthID() { + return healthID; + } + + public void setHealthID(String healthID) { + this.healthID = healthID; + } + + public String getHealthIDNumber() { + return healthIDNumber; + } + + public void setHealthIDNumber(String healthIDNumber) { + this.healthIDNumber = healthIDNumber; + } + + public String getAuthenticationMode() { + return authenticationMode; + } + + public void setAuthenticationMode(String authenticationMode) { + this.authenticationMode = authenticationMode; + } + + public String getAbhaCreatedDate() { + return abhaCreatedDate; + } + + public void setAbhaCreatedDate(String abhaCreatedDate) { + this.abhaCreatedDate = abhaCreatedDate; + } + } + } \ No newline at end of file diff --git a/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryElasticsearchIndexUpdater.java b/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryElasticsearchIndexUpdater.java index 82d6858..a0255d8 100644 --- a/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryElasticsearchIndexUpdater.java +++ b/src/main/java/com/iemr/common/identity/service/elasticsearch/BeneficiaryElasticsearchIndexUpdater.java @@ -1,6 +1,7 @@ package com.iemr.common.identity.service.elasticsearch; import java.math.BigInteger; +import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,6 +11,7 @@ import org.springframework.stereotype.Service; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.Refresh; import co.elastic.clients.elasticsearch.core.DeleteRequest; import co.elastic.clients.elasticsearch.core.IndexRequest; @@ -41,7 +43,7 @@ public class BeneficiaryElasticsearchIndexUpdater { * Called after beneficiary is created/updated in database */ @Async("elasticsearchSyncExecutor") - public void syncBeneficiaryAsync(BigInteger benRegId) { + public void syncBeneficiaryAsyncOld(BigInteger benRegId) { if (!esEnabled) { logger.debug("Elasticsearch is disabled, skipping sync"); return; @@ -63,15 +65,14 @@ public void syncBeneficiaryAsync(BigInteger benRegId) { } IndexRequest request = IndexRequest.of(i -> i - .index(beneficiaryIndex) - .id(doc.getBenId()) - .document(doc) - ); + .index(beneficiaryIndex) + .id(doc.getBenId()) + .document(doc)); esClient.index(request); - logger.info("Successfully synced beneficiary to Elasticsearch: benRegId={}, benId={}", - benRegId, doc.getBenId()); + logger.info("Successfully synced beneficiary to Elasticsearch: benRegId={}, benId={}", + benRegId, doc.getBenId()); } catch (Exception e) { logger.error("Error syncing beneficiary {} to Elasticsearch: {}", benRegId, e.getMessage(), e); @@ -92,9 +93,8 @@ public void deleteBeneficiaryAsync(String benId) { logger.info("Starting async delete for benId: {}", benId); DeleteRequest request = DeleteRequest.of(d -> d - .index(beneficiaryIndex) - .id(benId) - ); + .index(beneficiaryIndex) + .id(benId)); esClient.delete(request); @@ -104,4 +104,31 @@ public void deleteBeneficiaryAsync(String benId) { logger.error("Error deleting beneficiary {} from Elasticsearch: {}", benId, e.getMessage(), e); } } + + @Async + public CompletableFuture syncBeneficiaryAsync(BigInteger benRegId) { + BeneficiaryDocument document = dataService.getBeneficiaryWithAbhaDetails(benRegId); + if (document == null) { + logger.warn("No data found for benRegId: {}", benRegId); + return CompletableFuture.completedFuture(null); + } + + // Log ABHA for verification + logger.info("Syncing benRegId={} with ABHA: healthID={}, abhaID={}", + benRegId, document.getHealthID(), document.getAbhaID()); + + try{ + // Index to ES + esClient.index(i -> i + .index(beneficiaryIndex) + .id(String.valueOf(benRegId)) + .document(document).refresh(Refresh.True)); + + logger.info("Successfully synced benRegId: {} to ES", benRegId); + } catch (Exception e) { + logger.error("Error syncing beneficiary {} to Elasticsearch: {}", benRegId, e.getMessage(), e); + } + return CompletableFuture.completedFuture(null); + + } } \ No newline at end of file diff --git a/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchIndexingService.java b/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchIndexingService.java index 941a055..44d13d3 100644 --- a/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchIndexingService.java +++ b/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchIndexingService.java @@ -2,7 +2,9 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.mapping.*; -import co.elastic.clients.elasticsearch.indices.CreateIndexRequest; +import co.elastic.clients.elasticsearch.indices.IndexSettings; +import co.elastic.clients.elasticsearch.indices.TranslogDurability; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -27,10 +29,11 @@ public class ElasticsearchIndexingService { private String beneficiaryIndex; /** - * Create or recreate the Elasticsearch index with proper mapping + * Create index optimized for BULK INDEXING + * Settings will be updated for search after sync completes */ public void createIndexWithMapping() throws Exception { - logger.info("Creating index with mapping: {}", beneficiaryIndex); + logger.info("Creating index optimized for bulk indexing: {}", beneficiaryIndex); // Delete existing index if it exists if (esClient.indices().exists(e -> e.index(beneficiaryIndex)).value()) { @@ -38,24 +41,77 @@ public void createIndexWithMapping() throws Exception { esClient.indices().delete(d -> d.index(beneficiaryIndex)); } - // Create index with mapping + // PHASE 1 SETTINGS: Optimized for BULK INDEXING (maximum write speed) + IndexSettings settings = IndexSettings.of(s -> s + // CRITICAL: Disable refresh during bulk indexing + .refreshInterval(t -> t.time("-1")) // -1 = disable completely for max speed + + // Use 1 shard for datasets < 50GB (optimal for 784K records) + .numberOfShards("1") + + // No replicas during initial indexing (add later for HA) + .numberOfReplicas("0") + + // Disable query cache during indexing + .queries(q -> q + .cache(c -> c.enabled(false)) + ) + + .maxResultWindow(10000) + + // CRITICAL: Async translog for maximum speed + .translog(t -> t + .durability(TranslogDurability.Async) + .syncInterval(ts -> ts.time("120s")) // Longer interval for bulk ops + ) + ); + + // Field mappings (supports fast search) TypeMapping mapping = TypeMapping.of(tm -> tm .properties("benId", Property.of(p -> p.keyword(k -> k))) .properties("benRegId", Property.of(p -> p.long_(l -> l))) .properties("beneficiaryID", Property.of(p -> p.keyword(k -> k))) + .properties("firstName", Property.of(p -> p.text(t -> t - .fields("keyword", Property.of(fp -> fp.keyword(k -> k)))))) + .analyzer("standard") + .fields("keyword", Property.of(fp -> fp.keyword(k -> k.ignoreAbove(256)))) + .fields("prefix", Property.of(fp -> fp.text(txt -> txt + .analyzer("standard") + .indexPrefixes(ip -> ip.minChars(2).maxChars(5)) + ))) + ))) + .properties("lastName", Property.of(p -> p.text(t -> t - .fields("keyword", Property.of(fp -> fp.keyword(k -> k)))))) + .analyzer("standard") + .fields("keyword", Property.of(fp -> fp.keyword(k -> k.ignoreAbove(256)))) + .fields("prefix", Property.of(fp -> fp.text(txt -> txt + .analyzer("standard") + .indexPrefixes(ip -> ip.minChars(2).maxChars(5)) + ))) + ))) + + .properties("fatherName", Property.of(p -> p.text(t -> t + .analyzer("standard") + .fields("keyword", Property.of(fp -> fp.keyword(k -> k.ignoreAbove(256)))) + ))) + + .properties("spouseName", Property.of(p -> p.text(t -> t + .analyzer("standard") + .fields("keyword", Property.of(fp -> fp.keyword(k -> k.ignoreAbove(256)))) + ))) + .properties("genderID", Property.of(p -> p.integer(i -> i))) .properties("genderName", Property.of(p -> p.keyword(k -> k))) - .properties("dOB", Property.of(p -> p.date(d -> d))) + .properties("dOB", Property.of(p -> p.date(d -> d.format("strict_date_optional_time||epoch_millis")))) .properties("age", Property.of(p -> p.integer(i -> i))) - .properties("phoneNum", Property.of(p -> p.keyword(k -> k))) - .properties("fatherName", Property.of(p -> p.text(t -> t - .fields("keyword", Property.of(fp -> fp.keyword(k -> k)))))) - .properties("spouseName", Property.of(p -> p.text(t -> t - .fields("keyword", Property.of(fp -> fp.keyword(k -> k)))))) + + .properties("phoneNum", Property.of(p -> p.keyword(k -> k + .fields("ngram", Property.of(fp -> fp.text(txt -> txt + .analyzer("standard") + .searchAnalyzer("standard") + ))) + ))) + .properties("isHIVPos", Property.of(p -> p.keyword(k -> k))) .properties("createdBy", Property.of(p -> p.keyword(k -> k))) .properties("createdDate", Property.of(p -> p.date(d -> d))) @@ -64,8 +120,10 @@ public void createIndexWithMapping() throws Exception { .properties("healthID", Property.of(p -> p.keyword(k -> k))) .properties("abhaID", Property.of(p -> p.keyword(k -> k))) + .properties("abhaCreatedDate", Property.of(p -> p.keyword(k -> k))) .properties("familyID", Property.of(p -> p.keyword(k -> k))) + // Geographic fields .properties("stateID", Property.of(p -> p.integer(i -> i))) .properties("stateName", Property.of(p -> p.keyword(k -> k))) .properties("districtID", Property.of(p -> p.integer(i -> i))) @@ -79,6 +137,7 @@ public void createIndexWithMapping() throws Exception { .properties("servicePointName", Property.of(p -> p.keyword(k -> k))) .properties("parkingPlaceID", Property.of(p -> p.integer(i -> i))) + // Permanent address fields .properties("permStateID", Property.of(p -> p.integer(i -> i))) .properties("permStateName", Property.of(p -> p.keyword(k -> k))) .properties("permDistrictID", Property.of(p -> p.integer(i -> i))) @@ -88,28 +147,100 @@ public void createIndexWithMapping() throws Exception { .properties("permVillageID", Property.of(p -> p.integer(i -> i))) .properties("permVillageName", Property.of(p -> p.keyword(k -> k))) + // Identity fields .properties("aadharNo", Property.of(p -> p.keyword(k -> k))) .properties("govtIdentityNo", Property.of(p -> p.keyword(k -> k))) ); esClient.indices().create(c -> c .index(beneficiaryIndex) + .settings(settings) .mappings(mapping) ); - logger.info("Index created successfully: {}", beneficiaryIndex); + logger.info("Index created with BULK INDEXING optimization"); + logger.info("Settings: refresh=disabled, replicas=0, async_translog, 1 shard"); + } + + /** + * PHASE 2: Optimize for SEARCH after bulk indexing completes + * Call this AFTER indexAllBeneficiaries() finishes + */ + public void optimizeForSearch() throws Exception { + logger.info("PHASE 2: Optimizing index for SEARCH performance"); + + // Step 1: Force refresh to make all documents searchable + logger.info("Step 1/3: Forcing refresh to make documents visible..."); + esClient.indices().refresh(r -> r.index(beneficiaryIndex)); + logger.info("Documents are now searchable"); + + // Step 2: Update settings for production search + logger.info("Step 2/3: Updating index settings for production..."); + esClient.indices().putSettings(s -> s + .index(beneficiaryIndex) + .settings(is -> is + .refreshInterval(t -> t.time("1s")) // Enable 1s refresh for near real-time search + .numberOfReplicas("1") // Add replica for high availability + .translog(t -> t + .durability(TranslogDurability.Request) // Synchronous for data safety + .syncInterval(ts -> ts.time("5s")) + ) + .queries(q -> q + .cache(c -> c.enabled(true)) // Enable query cache for faster searches + ) + ) + ); + logger.info("Settings applied: refresh=1s, replicas=1, query_cache=enabled"); + + // Step 3: Force merge to optimize segments + logger.info("Step 3/3: Force merging segments for optimal read performance..."); + logger.info("This may take 5-15 minutes depending on data size..."); + esClient.indices().forcemerge(f -> f + .index(beneficiaryIndex) + .maxNumSegments(1L) // Single segment per shard = fastest searches + .flush(true) + ); + logger.info("Segments merged to 1 per shard"); + + logger.info("INDEX OPTIMIZATION COMPLETE!"); + logger.info("Index is now ready for searches"); } /** - * Index all beneficiaries - delegates to existing sync service - * This is much safer than loading all records at once + * COMPLETE WORKFLOW: Create index + Sync data + Optimize + * This is your existing endpoint, now with automatic optimization */ public Map indexAllBeneficiaries() { - logger.info("Starting full indexing via sync service..."); + logger.info("COMPLETE INDEXING WORKFLOW"); + + long startTime = System.currentTimeMillis(); try { + // Execute bulk indexing (now uses optimized batch queries) + logger.info("PHASE 1: Bulk indexing beneficiaries with batch queries..."); ElasticsearchSyncService.SyncResult result = syncService.syncAllBeneficiaries(); + long indexingTime = System.currentTimeMillis() - startTime; + logger.info("Bulk indexing completed in {} seconds ({} minutes)", + indexingTime / 1000, indexingTime / 60000); + logger.info("Success: {}, Failed: {}", result.getSuccessCount(), result.getFailureCount()); + + // Optimize for search + logger.info(""); + logger.info("PHASE 2: Optimizing for search..."); + long optimizeStart = System.currentTimeMillis(); + optimizeForSearch(); + long optimizeTime = System.currentTimeMillis() - optimizeStart; + logger.info("Optimization completed in {} seconds ({} minutes)", + optimizeTime / 1000, optimizeTime / 60000); + + long totalTime = System.currentTimeMillis() - startTime; + + logger.info("COMPLETE WORKFLOW FINISHED!"); + logger.info("Total time: {} seconds ({} minutes)", totalTime / 1000, totalTime / 60000); + logger.info("Indexing: {}m | Optimization: {}m", indexingTime / 60000, optimizeTime / 60000); + + // Return response in your existing format Map response = new HashMap<>(); response.put("success", result.getSuccessCount()); response.put("failed", result.getFailureCount()); @@ -117,11 +248,28 @@ public Map indexAllBeneficiaries() { return response; } catch (Exception e) { - logger.error("Error during indexing", e); + logger.error("Error during indexing workflow", e); Map response = new HashMap<>(); response.put("success", 0); response.put("failed", 0); return response; } } + + /** + * Get index statistics (unchanged) + */ + public Map getIndexStats() throws Exception { + var stats = esClient.indices().stats(s -> s.index(beneficiaryIndex)); + var settings = esClient.indices().getSettings(g -> g.index(beneficiaryIndex)); + + Map info = new HashMap<>(); + info.put("documentCount", stats.indices().get(beneficiaryIndex).primaries().docs().count()); + info.put("sizeInBytes", stats.indices().get(beneficiaryIndex).primaries().store().sizeInBytes()); + info.put("refreshInterval", settings.get(beneficiaryIndex).settings().index().refreshInterval().time()); + info.put("numberOfShards", settings.get(beneficiaryIndex).settings().index().numberOfShards()); + info.put("numberOfReplicas", settings.get(beneficiaryIndex).settings().index().numberOfReplicas()); + + return info; + } } \ No newline at end of file diff --git a/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchService.java b/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchService.java index a8b10ae..b65f435 100644 --- a/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchService.java +++ b/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchService.java @@ -7,10 +7,7 @@ import co.elastic.clients.elasticsearch._types.query_dsl.TextQueryType; import co.elastic.clients.elasticsearch.core.SearchResponse; -import com.iemr.common.identity.domain.Address; -import com.iemr.common.identity.dto.BeneficiariesDTO; import com.iemr.common.identity.dto.BeneficiariesESDTO; -import com.iemr.common.identity.dto.IdentitySearchDTO; import com.iemr.common.identity.repo.BenDetailRepo; import org.slf4j.Logger; @@ -20,464 +17,378 @@ import org.springframework.stereotype.Service; import java.math.BigDecimal; -import java.math.BigInteger; import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; import co.elastic.clients.elasticsearch._types.SortOrder; -import co.elastic.clients.elasticsearch._types.query_dsl.FunctionScore; -import co.elastic.clients.elasticsearch._types.query_dsl.FunctionScoreMode; -import co.elastic.clients.elasticsearch._types.query_dsl.FunctionBoostMode; import com.iemr.common.identity.repo.BenAddressRepo; @Service public class ElasticsearchService { - + private static final Logger logger = LoggerFactory.getLogger(ElasticsearchService.class); - + @Autowired private ElasticsearchClient esClient; - + @Autowired private BenDetailRepo benDetailRepo; @Autowired private BenAddressRepo benAddressRepo; - + @Value("${elasticsearch.index.beneficiary}") private String beneficiaryIndex; - + @Value("${elasticsearch.enabled}") private boolean esEnabled; - -/** - * Universal search with score-based filtering and location ranking - * Only returns records that actually match the query (not all 10000) - */ -public List> universalSearch(String query, Integer userId) { - try { - final Map userLocation = - (userId != null) ? getUserLocation(userId) : null; - - - boolean isNumeric = query.matches("\\d+"); - - // Determine minimum score threshold based on query type - double minScore = isNumeric ? 1.0 : 3.0; - - SearchResponse response = esClient.search(s -> s - .index(beneficiaryIndex) - .query(q -> q - .functionScore(fs -> fs - .query(qq -> qq - .bool(b -> { - if (!isNumeric) { - // Name searches with higher boost for exact matches - b.should(s1 -> s1.multiMatch(mm -> mm - .query(query) - .fields("firstName^3", "lastName^3", "fatherName^2", "spouseName^2") - .type(TextQueryType.BestFields) - .fuzziness("AUTO") - )); - - // Exact keyword matches get highest boost - b.should(s2 -> s2.term(t -> t.field("firstName.keyword").value(query).boost(5.0f))); - b.should(s3 -> s3.term(t -> t.field("lastName.keyword").value(query).boost(5.0f))); - b.should(s4 -> s4.term(t -> t.field("fatherName.keyword").value(query).boost(4.0f))); - b.should(s5 -> s5.term(t -> t.field("spouseName.keyword").value(query).boost(4.0f))); - } - - // ID searches - exact matches get very high boost - b.should(s6 -> s6.term(t -> t.field("healthID").value(query).boost(10.0f))); - b.should(s7 -> s7.term(t -> t.field("abhaID").value(query).boost(10.0f))); - b.should(s8 -> s8.term(t -> t.field("familyID").value(query).boost(8.0f))); - b.should(s9 -> s9.term(t -> t.field("beneficiaryID").value(query).boost(10.0f))); - b.should(s10 -> s10.term(t -> t.field("benId").value(query).boost(10.0f))); - b.should(s11 -> s11.term(t -> t.field("aadharNo").value(query).boost(9.0f))); - b.should(s12 -> s12.term(t -> t.field("govtIdentityNo").value(query).boost(8.0f))); - - if (isNumeric) { - // Partial matches for numeric fields - b.should(s13 -> s13.wildcard(w -> w.field("phoneNum").value("*" + query + "*").boost(3.0f))); - b.should(s14 -> s14.wildcard(w -> w.field("healthID").value("*" + query + "*").boost(2.0f))); - b.should(s15 -> s15.wildcard(w -> w.field("abhaID").value("*" + query + "*").boost(2.0f))); - b.should(s16 -> s16.wildcard(w -> w.field("familyID").value("*" + query + "*").boost(2.0f))); - b.should(s17 -> s17.wildcard(w -> w.field("beneficiaryID").value("*" + query + "*").boost(2.0f))); - b.should(s18 -> s18.wildcard(w -> w.field("benId").value("*" + query + "*").boost(2.0f))); - b.should(s19 -> s19.wildcard(w -> w.field("aadharNo").value("*" + query + "*").boost(2.0f))); - b.should(s20 -> s20.wildcard(w -> w.field("govtIdentityNo").value("*" + query + "*").boost(2.0f))); - - // Prefix matches - b.should(s21 -> s21.prefix(p -> p.field("phoneNum").value(query).boost(4.0f))); - b.should(s22 -> s22.prefix(p -> p.field("healthID").value(query).boost(3.0f))); - b.should(s23 -> s23.prefix(p -> p.field("abhaID").value(query).boost(3.0f))); - b.should(s24 -> s24.prefix(p -> p.field("familyID").value(query).boost(3.0f))); - b.should(s25 -> s25.prefix(p -> p.field("beneficiaryID").value(query).boost(3.0f))); - b.should(s26 -> s26.prefix(p -> p.field("benId").value(query).boost(3.0f))); - - try { - Long numericValue = Long.parseLong(query); - b.should(s27 -> s27.term(t -> t.field("benRegId").value(numericValue).boost(10.0f))); - b.should(s28 -> s28.term(t -> t.field("benAccountID").value(numericValue).boost(8.0f))); - - int intValue = numericValue.intValue(); - b.should(s29 -> s29.term(t -> t.field("genderID").value(intValue).boost(2.0f))); - b.should(s30 -> s30.term(t -> t.field("age").value(intValue).boost(1.0f))); - b.should(s31 -> s31.term(t -> t.field("stateID").value(intValue).boost(1.0f))); - b.should(s32 -> s32.term(t -> t.field("districtID").value(intValue).boost(1.0f))); - b.should(s33 -> s33.term(t -> t.field("blockID").value(intValue).boost(1.0f))); - b.should(s34 -> s34.term(t -> t.field("villageID").value(intValue).boost(1.0f))); - b.should(s35 -> s35.term(t -> t.field("servicePointID").value(intValue).boost(1.0f))); - b.should(s36 -> s36.term(t -> t.field("parkingPlaceID").value(intValue).boost(1.0f))); - - logger.info("Added numeric searches for value: {}", numericValue); - } catch (NumberFormatException e) { - logger.warn("Failed to parse numeric value: {}", query); - } - } - - b.minimumShouldMatch("1"); - return b; - }) - ) - // Add location-based scoring if user location is available - .functions(getFunctionScores(userLocation)) - .scoreMode(FunctionScoreMode.Sum) - .boostMode(FunctionBoostMode.Multiply) - ) - ) - .minScore(minScore) - .size(500) - .sort(so -> so - .score(sc -> sc.order(SortOrder.Desc)) - ) - , BeneficiariesESDTO.class); - - logger.info("ES returned {} hits for query: '{}' (min score: {})", - response.hits().hits().size(), query, minScore); - - List> allResults = response.hits().hits().stream() - .map(hit -> { - if (hit.source() != null) { - logger.debug("Hit score: {}, benRegId: {}, name: {} {}", - hit.score(), - hit.source().getBenRegId(), - hit.source().getFirstName(), - hit.source().getLastName()); - } - Map result = mapESResultToExpectedFormat(hit.source()); - if (result != null) { - result.put("_score", hit.score()); - } - return result; - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + /** + * Universal search with score-based filtering and location ranking + * Only returns records that actually match the query (not all 10000) + */ + public List> universalSearch(String query, Integer userId) { + try { + final Map userLocation = (userId != null) ? getUserLocation(userId) : null; + + boolean isNumeric = query.matches("\\d+"); + double minScore = isNumeric ? 1.0 : 2.0; + + SearchResponse response = esClient.search(s -> s + .index(beneficiaryIndex) + + .preference("_local") + + .requestCache(true) + + .query(q -> q + .functionScore(fs -> fs + .query(qq -> qq + .bool(b -> { + if (!isNumeric) { + // OPTIMIZED NAME SEARCH + // Use match_phrase_prefix for faster prefix matching + b.should(s1 -> s1.multiMatch(mm -> mm + .query(query) + .fields("firstName^3", "lastName^3") + .type(TextQueryType.Phrase) + .boost(3.0f))); + + b.should(s2 -> s2.term(t -> t + .field("firstName.keyword") + .value(query) + .boost(10.0f))); + b.should(s3 -> s3.term(t -> t + .field("lastName.keyword") + .value(query) + .boost(10.0f))); + + // Prefix search using index_prefixes (FAST!) + b.should(s4 -> s4.match(m -> m + .field("firstName.prefix") + .query(query) + .boost(2.0f))); + b.should(s5 -> s5.match(m -> m + .field("lastName.prefix") + .query(query) + .boost(2.0f))); + } + + b.should(s6 -> s6 + .term(t -> t.field("healthID").value(query).boost(15.0f))); + b.should(s7 -> s7 + .term(t -> t.field("abhaID").value(query).boost(15.0f))); + b.should(s8 -> s8 + .term(t -> t.field("beneficiaryID").value(query).boost(15.0f))); + b.should( + s9 -> s9.term(t -> t.field("benId").value(query).boost(15.0f))); + b.should(s10 -> s10 + .term(t -> t.field("aadharNo").value(query).boost(12.0f))); + + if (isNumeric) { + // PREFIX QUERIES (much faster than wildcard) + b.should(s11 -> s11 + .prefix(p -> p.field("phoneNum").value(query).boost(5.0f))); + b.should(s12 -> s12 + .prefix(p -> p.field("healthID").value(query).boost(4.0f))); + b.should(s13 -> s13 + .prefix(p -> p.field("abhaID").value(query).boost(4.0f))); + b.should(s14 -> s14.prefix( + p -> p.field("beneficiaryID").value(query).boost(4.0f))); + + // ONLY use wildcard if query is long enough (>= 4 digits) + if (query.length() >= 4) { + b.should(s15 -> s15.wildcard(w -> w + .field("phoneNum") + .value("*" + query + "*") + .boost(2.0f))); + } + + try { + Long numericValue = Long.parseLong(query); + b.should(s16 -> s16.term(t -> t.field("benRegId") + .value(numericValue).boost(15.0f))); + b.should(s17 -> s17.term(t -> t.field("benAccountID") + .value(numericValue).boost(10.0f))); + + int intValue = numericValue.intValue(); + if (userLocation != null) { + Integer userVillageId = userLocation.get("villageId"); + Integer userBlockId = userLocation.get("blockId"); + + if (userVillageId != null && userVillageId == intValue) { + b.should(s18 -> s18.term(t -> t.field("villageID") + .value(intValue).boost(3.0f))); + } + if (userBlockId != null && userBlockId == intValue) { + b.should(s19 -> s19.term(t -> t.field("blockID") + .value(intValue).boost(2.0f))); + } + } + } catch (NumberFormatException e) { + } + } + + b.minimumShouldMatch("1"); + return b; + })) + .functions(getFunctionScores(userLocation)) + .scoreMode(FunctionScoreMode.Sum) + .boostMode(FunctionBoostMode.Multiply) + .maxBoost(5.0))) + .minScore(minScore) + + .size(100) // Reduced from 500 + + .sort(so -> so.score(sc -> sc.order(SortOrder.Desc))) + + .source(src -> src + .filter(f -> f + .includes("benRegId", "beneficiaryID", "firstName", "lastName", + "genderID", "genderName", "dOB", "phoneNum", + "stateID", "stateName", "districtID", "blockID", "villageID", "healthID", + "abhaID", + "abhaCreatedDate", + "familyID", + "fatherName", "spouseName", "age", "createdBy", "createdDate", + "lastModDate", "benAccountID", "districtName", "blockName", + "villageName", "pinCode", "servicePointID", "servicePointName", + "parkingPlaceID", "permStateID", "permStateName", "permDistrictID", + "permDistrictName", "permBlockID", "permBlockName", "permVillageID", + "permVillageName"))) + + , BeneficiariesESDTO.class); + + logger.info("ES returned {} hits in {}ms for query: '{}'", + response.hits().hits().size(), + response.took(), + query); + + if (!response.hits().hits().isEmpty()) { + BeneficiariesESDTO firstResult = response.hits().hits().get(0).source(); + logger.info("First result - benRegId: {}, healthID: {}, abhaID: {}", + firstResult.getBenRegId(), + firstResult.getHealthID(), + firstResult.getAbhaID()); + } - if (allResults.isEmpty()) { - logger.info("No results found in ES, falling back to database"); + if (response.hits().hits().isEmpty()) { + logger.info("No results in ES, using database fallback"); + return searchInDatabaseDirectly(query); + } + + List> results = response.hits().hits().stream() + .map(hit -> { + Map result = mapESResultToExpectedFormat(hit.source()); + if (result != null) { + result.put("_score", hit.score()); + } + return result; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + logger.info("Returning {} results", results.size()); + return results; + + } catch (Exception e) { + logger.error("ES search failed: {}", e.getMessage()); return searchInDatabaseDirectly(query); } + } - logger.info("Returning {} matched results", allResults.size()); - return allResults; + /** + * Generate function scores for location-based ranking + */ + private List getFunctionScores(Map userLocation) { + if (userLocation == null) { + return List.of(); + } - } catch (Exception e) { - logger.error("ES universal search failed: {}", e.getMessage(), e); - logger.info("Fallback: Searching in MySQL database"); - return searchInDatabaseDirectly(query); - } -} - -/** - * Generate function scores for location-based ranking - */ -private List getFunctionScores(Map userLocation) { - if (userLocation == null) { - return List.of(); - } + List scores = new ArrayList<>(); - List scores = new ArrayList<>(); - - Integer userVillageId = userLocation.get("villageId"); - Integer userBlockId = userLocation.get("blockId"); - - // Village match - highest boost - if (userVillageId != null) { - scores.add(FunctionScore.of(f -> f - .filter(ff -> ff.term(t -> t.field("villageID").value(userVillageId))) - .weight(3.0) - )); - } - - // Block match - medium boost - if (userBlockId != null) { - scores.add(FunctionScore.of(f -> f - .filter(ff -> ff.term(t -> t.field("blockID").value(userBlockId))) - .weight(2.0) - )); - } - - return scores; -} - -/** - * Advanced search with multiple criteria - only returns actual matches - */ -public List> advancedSearch( - String firstName, - String lastName, - Integer genderId, - Date dob, - Integer stateId, - Integer districtId, - Integer blockId, - Integer villageId, - String fatherName, - String spouseName, - String phoneNumber, - String beneficiaryId, - String healthId, - String aadharNo, - Integer userId) { - - try { - logger.info("ES Advanced Search - firstName: {}, lastName: {}, genderId: {}, stateId: {}, districtId: {}, blockId: {}, villageId: {}", - firstName, lastName, genderId, stateId, districtId, blockId, villageId); - - final Map userLocation = - (userId != null) ? getUserLocation(userId) : null; - - - SearchResponse response = esClient.search(s -> s - .index(beneficiaryIndex) - .query(q -> q - .functionScore(fs -> fs - .query(qq -> qq - .bool(b -> { - // Name searches with fuzzy matching and boost - if (firstName != null && !firstName.trim().isEmpty()) { - b.must(m -> m.bool(bb -> bb - .should(s1 -> s1.match(mm -> mm - .field("firstName") - .query(firstName) - .fuzziness("AUTO") - .boost(2.0f) - )) - .should(s2 -> s2.term(t -> t - .field("firstName.keyword") - .value(firstName) - .boost(5.0f) - )) - .minimumShouldMatch("1") - )); - } - - if (lastName != null && !lastName.trim().isEmpty()) { - b.must(m -> m.bool(bb -> bb - .should(s1 -> s1.match(mm -> mm - .field("lastName") - .query(lastName) - .fuzziness("AUTO") - .boost(2.0f) - )) - .should(s2 -> s2.term(t -> t - .field("lastName.keyword") - .value(lastName) - .boost(5.0f) - )) - .minimumShouldMatch("1") - )); - } - - if (fatherName != null && !fatherName.trim().isEmpty()) { - b.must(m -> m.match(mm -> mm - .field("fatherName") - .query(fatherName) - .fuzziness("AUTO") - .boost(2.0f) - )); - } - - if (spouseName != null && !spouseName.trim().isEmpty()) { - b.must(m -> m.match(mm -> mm - .field("spouseName") - .query(spouseName) - .fuzziness("AUTO") - .boost(2.0f) - )); - } - - // Exact matches for IDs and structured data - if (genderId != null) { - b.filter(m -> m.term(t -> t - .field("genderID") - .value(genderId) - )); - } - - if (dob != null) { - b.must(m -> m.term(t -> t - .field("dob") - .value(dob.getTime()) - .boost(5.0f) - )); - } - - // Location filters - if (stateId != null) { - b.filter(m -> m.term(t -> t - .field("stateID") - .value(stateId) - )); - } - - if (districtId != null) { - b.filter(m -> m.term(t -> t - .field("districtID") - .value(districtId) - )); - } - - if (blockId != null) { - b.filter(m -> m.term(t -> t - .field("blockID") - .value(blockId) - )); - } - - if (villageId != null) { - b.must(m -> m.term(t -> t - .field("villageID") - .value(villageId) - .boost(3.0f) - )); - } - - // Identity searches - if (phoneNumber != null && !phoneNumber.trim().isEmpty()) { - b.must(m -> m.bool(bb -> bb - .should(s1 -> s1.term(t -> t - .field("phoneNum") - .value(phoneNumber) - .boost(5.0f) - )) - .should(s2 -> s2.wildcard(w -> w - .field("phoneNum") - .value("*" + phoneNumber + "*") - .boost(2.0f) - )) - .minimumShouldMatch("1") - )); - } - - if (beneficiaryId != null && !beneficiaryId.trim().isEmpty()) { - b.must(m -> m.term(t -> t - .field("beneficiaryID") - .value(beneficiaryId) - .boost(10.0f) - )); - } - - if (healthId != null && !healthId.trim().isEmpty()) { - b.must(m -> m.bool(bb -> bb - .should(s1 -> s1.term(t -> t - .field("healthID") - .value(healthId) - .boost(10.0f) - )) - .should(s2 -> s2.term(t -> t - .field("abhaID") - .value(healthId) - .boost(10.0f) - )) - .minimumShouldMatch("1") - )); - } - - if (aadharNo != null && !aadharNo.trim().isEmpty()) { - b.must(m -> m.term(t -> t - .field("aadharNo") - .value(aadharNo) - .boost(10.0f) - )); - } - - return b; - }) - ) - // Add location-based scoring - .functions(getFunctionScores(userLocation)) - .scoreMode(FunctionScoreMode.Sum) - .boostMode(FunctionBoostMode.Multiply) - ) - ) - .minScore(2.0) - .size(500) - .sort(so -> so - .score(sc -> sc.order(SortOrder.Desc)) - ) - , BeneficiariesESDTO.class); - - logger.info("ES advanced search returned {} hits", response.hits().hits().size()); - - if (response.hits().hits().isEmpty()) { - logger.info("No results in ES, falling back to database"); - return searchInDatabaseForAdvanced(firstName, lastName, genderId, dob, - stateId, districtId, blockId, villageId, fatherName, spouseName, - phoneNumber, beneficiaryId, healthId, aadharNo); + Integer userVillageId = userLocation.get("villageId"); + Integer userBlockId = userLocation.get("blockId"); + + // Village match + if (userVillageId != null) { + scores.add(FunctionScore.of(f -> f + .filter(ff -> ff.term(t -> t.field("villageID").value(userVillageId))) + .weight(2.0))); } - List> results = response.hits().hits().stream() - .map(hit -> { - Map result = mapESResultToExpectedFormat(hit.source()); - if (result != null) { - result.put("_score", hit.score()); // Include score - } - return result; - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - logger.info("Returning {} matched results", results.size()); - return results; - - } catch (Exception e) { - logger.error("ES advanced search failed: {}", e.getMessage(), e); - logger.info("Fallback: Searching in MySQL database"); - return searchInDatabaseForAdvanced(firstName, lastName, genderId, dob, - stateId, districtId, blockId, villageId, fatherName, spouseName, - phoneNumber, beneficiaryId, healthId, aadharNo); + // Block match + if (userBlockId != null) { + scores.add(FunctionScore.of(f -> f + .filter(ff -> ff.term(t -> t.field("blockID").value(userBlockId))) + .weight(1.5))); + } + + return scores; } -} - -/** - * Database fallback for advanced search - */ -private List> searchInDatabaseForAdvanced( - String firstName, String lastName, Integer genderId, Date dob, - Integer stateId, Integer districtId, Integer blockId, Integer villageId, - String fatherName, String spouseName, String phoneNumber, - String beneficiaryId, String healthId, String aadharNo) { - - try { - List results = benDetailRepo.advancedSearchBeneficiaries( - firstName, lastName, genderId, dob, stateId, districtId, - blockId, fatherName, spouseName, phoneNumber, - beneficiaryId - ); - - return results.stream() - .map(this::mapToExpectedFormat) - .collect(Collectors.toList()); - - } catch (Exception e) { - logger.error("Database advanced search failed: {}", e.getMessage(), e); - return Collections.emptyList(); + + /** + * Advanced search with filter context + */ + public List> advancedSearch( + String firstName, + String lastName, + Integer genderId, + Date dob, + Integer stateId, + Integer districtId, + Integer blockId, + Integer villageId, + String fatherName, + String spouseName, + String phoneNumber, + String beneficiaryId, + String healthId, + String aadharNo, + Integer userId) { + + try { + final Map userLocation = (userId != null) ? getUserLocation(userId) : null; + + SearchResponse response = esClient.search(s -> s + .index(beneficiaryIndex) + .preference("_local") + .requestCache(true) + + .query(q -> q + .bool(b -> { + // Use FILTER context for exact matches (faster, cached) + if (genderId != null) { + b.filter(f -> f.term(t -> t.field("genderID").value(genderId))); + } + if (stateId != null) { + b.filter(f -> f.term(t -> t.field("stateID").value(stateId))); + } + if (districtId != null) { + b.filter(f -> f.term(t -> t.field("districtID").value(districtId))); + } + if (blockId != null) { + b.filter(f -> f.term(t -> t.field("blockID").value(blockId))); + } + if (villageId != null) { + b.filter(f -> f.term(t -> t.field("villageID").value(villageId))); + } + + // MUST context for scored searches + if (firstName != null && !firstName.trim().isEmpty()) { + b.must(m -> m.bool(bb -> bb + .should(s1 -> s1.term( + t -> t.field("firstName.keyword").value(firstName).boost(5.0f))) + .should(s2 -> s2 + .match(mm -> mm.field("firstName").query(firstName).boost(2.0f))) + .minimumShouldMatch("1"))); + } + + if (lastName != null && !lastName.trim().isEmpty()) { + b.must(m -> m.bool(bb -> bb + .should(s1 -> s1 + .term(t -> t.field("lastName.keyword").value(lastName).boost(5.0f))) + .should(s2 -> s2 + .match(mm -> mm.field("lastName").query(lastName).boost(2.0f))) + .minimumShouldMatch("1"))); + } + + // Exact match IDs in filter context + if (beneficiaryId != null && !beneficiaryId.trim().isEmpty()) { + b.filter(f -> f.term(t -> t.field("beneficiaryID").value(beneficiaryId))); + } + if (healthId != null && !healthId.trim().isEmpty()) { + b.filter(f -> f.bool(bb -> bb + .should(s1 -> s1.term(t -> t.field("healthID").value(healthId))) + .should(s2 -> s2.term(t -> t.field("abhaID").value(healthId))) + .minimumShouldMatch("1"))); + } + if (aadharNo != null && !aadharNo.trim().isEmpty()) { + b.filter(f -> f.term(t -> t.field("aadharNo").value(aadharNo))); + } + if (phoneNumber != null && !phoneNumber.trim().isEmpty()) { + b.must(m -> m.bool(bb -> bb + .should(s1 -> s1 + .term(t -> t.field("phoneNum").value(phoneNumber).boost(3.0f))) + .should(s2 -> s2 + .prefix(p -> p.field("phoneNum").value(phoneNumber).boost(2.0f))) + .minimumShouldMatch("1"))); + } + + return b; + })) + .size(100) + .sort(so -> so.score(sc -> sc.order(SortOrder.Desc))) + + , BeneficiariesESDTO.class); + + if (response.hits().hits().isEmpty()) { + return searchInDatabaseForAdvanced(firstName, lastName, genderId, dob, + stateId, districtId, blockId, villageId, fatherName, spouseName, + phoneNumber, beneficiaryId, healthId, aadharNo); + } + + return response.hits().hits().stream() + .map(hit -> mapESResultToExpectedFormat(hit.source())) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + } catch (Exception e) { + logger.error("ES advanced search failed: {}", e.getMessage()); + return searchInDatabaseForAdvanced(firstName, lastName, genderId, dob, + stateId, districtId, blockId, villageId, fatherName, spouseName, + phoneNumber, beneficiaryId, healthId, aadharNo); + } } -} + /** + * Database fallback for advanced search + */ + private List> searchInDatabaseForAdvanced( + String firstName, String lastName, Integer genderId, Date dob, + Integer stateId, Integer districtId, Integer blockId, Integer villageId, + String fatherName, String spouseName, String phoneNumber, + String beneficiaryId, String healthId, String aadharNo) { + + try { + List results = benDetailRepo.advancedSearchBeneficiaries( + firstName, lastName, genderId, dob, stateId, districtId, + blockId, fatherName, spouseName, phoneNumber, + beneficiaryId); + + return results.stream() + .map(this::mapToExpectedFormat) + .collect(Collectors.toList()); + + } catch (Exception e) { + logger.error("Database advanced search failed: {}", e.getMessage(), e); + return Collections.emptyList(); + } + } /** * Overloaded method without userId (backward compatibility) @@ -485,7 +396,7 @@ private List> searchInDatabaseForAdvanced( public List> universalSearch(String query) { return universalSearch(query, null); } - + /** * Get user location from database */ @@ -506,73 +417,73 @@ private Map getUserLocation(Integer userId) { } return null; } - + /** * Rank results by location match priority * Priority: Village > Block > District > State > No Match */ - private List> rankByLocation(List> results, - Map userLocation) { + private List> rankByLocation(List> results, + Map userLocation) { Integer userBlockId = userLocation.get("blockId"); Integer userVillageId = userLocation.get("villageId"); - + logger.info("Ranking by location - User Block: {}, Village: {}", userBlockId, userVillageId); - + return results.stream() - .sorted((r1, r2) -> { - int score1 = calculateLocationScore(r1, userBlockId, userVillageId); - int score2 = calculateLocationScore(r2, userBlockId, userVillageId); - return Integer.compare(score2, score1); - }) - .collect(Collectors.toList()); + .sorted((r1, r2) -> { + int score1 = calculateLocationScore(r1, userBlockId, userVillageId); + int score2 = calculateLocationScore(r2, userBlockId, userVillageId); + return Integer.compare(score2, score1); + }) + .collect(Collectors.toList()); } - + /** * Calculate location match score * Higher score = better match */ - private int calculateLocationScore(Map beneficiary, - Integer userBlockId, - Integer userVillageId) { + private int calculateLocationScore(Map beneficiary, + Integer userBlockId, + Integer userVillageId) { int score = 0; - + try { Map demographics = (Map) beneficiary.get("i_bendemographics"); if (demographics == null) { return score; } - + Integer currBlockId = getIntegerFromMap(demographics, "blockID"); Integer currVillageId = getIntegerFromMap(demographics, "m_districtblock", "blockID"); - + // Village match (highest priority) - score: 100 if (userVillageId != null && userVillageId.equals(currVillageId)) { score += 100; } - + // Block match - score: 50 if (userBlockId != null && userBlockId.equals(currBlockId)) { score += 50; } - + Integer permBlockId = getIntegerFromMap(beneficiary, "permBlockID"); Integer permVillageId = getIntegerFromMap(beneficiary, "permVillageID"); - + if (userVillageId != null && userVillageId.equals(permVillageId)) { - score += 75; + score += 75; } - + if (userBlockId != null && userBlockId.equals(permBlockId)) { - score += 25; + score += 25; } - + } catch (Exception e) { logger.error("Error calculating location score: {}", e.getMessage()); } - + return score; } - + /** * Helper to safely get Integer from nested maps */ @@ -587,7 +498,7 @@ private Integer getIntegerFromMap(Map map, String... keys) { } return value instanceof Integer ? (Integer) value : null; } - + /** * Map ES DTO directly to expected API format with COMPLETE data */ @@ -595,82 +506,142 @@ private Map mapESResultToExpectedFormat(BeneficiariesESDTO esDat if (esData == null) { return null; } - + + logger.info("ESDATA=" + esData.getAbhaID()); + Map result = new HashMap<>(); - + try { // Basic fields from ES result.put("beneficiaryRegID", esData.getBenRegId()); result.put("beneficiaryID", esData.getBeneficiaryID()); result.put("firstName", esData.getFirstName()); - result.put("lastName", esData.getLastName()); + result.put("lastName", esData.getLastName() != null ? esData.getLastName() : ""); result.put("genderID", esData.getGenderID()); result.put("genderName", esData.getGenderName()); result.put("dob", esData.getDOB()); result.put("dOB", esData.getDOB()); result.put("age", esData.getAge()); + result.put("actualAge", esData.getAge()); // NEW + result.put("ageUnits", "Years"); // NEW + result.put("fatherName", esData.getFatherName() != null ? esData.getFatherName() : ""); result.put("spouseName", esData.getSpouseName() != null ? esData.getSpouseName() : ""); + result.put("isHIVPos", esData.getIsHIVPos() != null ? esData.getIsHIVPos() : ""); + result.put("createdBy", esData.getCreatedBy()); result.put("createdDate", esData.getCreatedDate()); result.put("lastModDate", esData.getLastModDate()); result.put("benAccountID", esData.getBenAccountID()); - - result.put("healthID", esData.getHealthID()); - result.put("abhaID", esData.getAbhaID()); + + // Family ID - use both formats result.put("familyID", esData.getFamilyID()); - + result.put("familyId", esData.getFamilyID()); // NEW - alternate key + + // Marital status - NEW + result.put("maritalStatusName", null); // TODO: Add to ES if available + result.put("maritalStatus", new HashMap<>()); + + // Head of family - NEW (add these fields to BeneficiariesESDTO if available) + result.put("headOfFamily_RelationID", null); + result.put("headOfFamily_Relation", null); + + // ABHA Details + List> abhaDetails = new ArrayList<>(); + if (esData.getHealthID() != null || esData.getAbhaID() != null) { + Map abhaDetail = new HashMap<>(); + abhaDetail.put("healthIDNumber", esData.getAbhaID()); + abhaDetail.put("healthID", esData.getAbhaID()); + if (esData.getAbhaCreatedDate() != null) { + + String dateStr = esData.getAbhaCreatedDate(); + ZoneId zoneId = ZoneId.of("Asia/Kolkata"); + long createdDateMillis; + + if (dateStr.length() == 10) { + // yyyy-MM-dd + createdDateMillis = LocalDate.parse(dateStr) + .atStartOfDay(zoneId) + .toInstant() + .toEpochMilli(); + } else { + // yyyy-MM-dd HH:mm:ss OR yyyy-MM-dd HH:mm:ss.S + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.S]"); + + createdDateMillis = LocalDateTime.parse(dateStr, formatter) + .atZone(zoneId) + .toInstant() + .toEpochMilli(); + } + + abhaDetail.put("createdDate", createdDateMillis); + } + abhaDetail.put("beneficiaryRegID", esData.getBenRegId()); + abhaDetails.add(abhaDetail); + } + result.put("abhaDetails", abhaDetails); + + // Gender object Map mGender = new HashMap<>(); mGender.put("genderID", esData.getGenderID()); mGender.put("genderName", esData.getGenderName()); result.put("m_gender", mGender); - + + // Demographics - UPDATED with all fields Map demographics = new HashMap<>(); demographics.put("beneficiaryRegID", esData.getBenRegId()); + + // Current address demographics.put("stateID", esData.getStateID()); demographics.put("stateName", esData.getStateName()); demographics.put("districtID", esData.getDistrictID()); demographics.put("districtName", esData.getDistrictName()); demographics.put("blockID", esData.getBlockID()); demographics.put("blockName", esData.getBlockName()); - demographics.put("villageID", esData.getVillageID()); - demographics.put("villageName", esData.getVillageName()); - demographics.put("districtBranchID", null); - demographics.put("districtBranchName", null); + demographics.put("villageID", esData.getVillageID()); // FIXED + demographics.put("villageName", esData.getVillageName()); // FIXED + + demographics.put("districtBranchID", esData.getVillageID()); + demographics.put("districtBranchName", esData.getVillageName()); demographics.put("parkingPlaceID", esData.getParkingPlaceID()); demographics.put("servicePointID", esData.getServicePointID()); demographics.put("servicePointName", esData.getServicePointName()); demographics.put("createdBy", esData.getCreatedBy()); - + + // State object Map mState = new HashMap<>(); mState.put("stateID", esData.getStateID()); mState.put("stateName", esData.getStateName()); mState.put("stateCode", null); mState.put("countryID", 1); demographics.put("m_state", mState); - + + // District object Map mDistrict = new HashMap<>(); mDistrict.put("districtID", esData.getDistrictID()); mDistrict.put("districtName", esData.getDistrictName()); mDistrict.put("stateID", esData.getStateID()); demographics.put("m_district", mDistrict); - + + // Block object Map mBlock = new HashMap<>(); mBlock.put("blockID", esData.getBlockID()); mBlock.put("blockName", esData.getBlockName()); mBlock.put("districtID", esData.getDistrictID()); mBlock.put("stateID", esData.getStateID()); demographics.put("m_districtblock", mBlock); - + + // Branch mapping object Map mBranch = new HashMap<>(); mBranch.put("districtBranchID", null); mBranch.put("blockID", esData.getBlockID()); - mBranch.put("villageName", esData.getVillageName()); + mBranch.put("villageName", esData.getVillageName()); // FIXED mBranch.put("pinCode", esData.getPinCode()); demographics.put("m_districtbranchmapping", mBranch); - + result.put("i_bendemographics", demographics); - + + // Phone maps List> benPhoneMaps = new ArrayList<>(); if (esData.getPhoneNum() != null && !esData.getPhoneNum().isEmpty()) { Map phoneMap = new HashMap<>(); @@ -679,19 +650,18 @@ private Map mapESResultToExpectedFormat(BeneficiariesESDTO esDat phoneMap.put("parentBenRegID", esData.getBenRegId()); phoneMap.put("benRelationshipID", 1); phoneMap.put("phoneNo", esData.getPhoneNum()); - + Map relationType = new HashMap<>(); relationType.put("benRelationshipID", 1); relationType.put("benRelationshipType", "Self"); phoneMap.put("benRelationshipType", relationType); - + benPhoneMaps.add(phoneMap); } result.put("benPhoneMaps", benPhoneMaps); - + + // Boolean flags result.put("isConsent", false); - result.put("m_title", new HashMap<>()); - result.put("maritalStatus", new HashMap<>()); result.put("changeInSelfDetails", false); result.put("changeInAddress", false); result.put("changeInContacts", false); @@ -704,39 +674,43 @@ private Map mapESResultToExpectedFormat(BeneficiariesESDTO esDat result.put("is1097", false); result.put("emergencyRegistration", false); result.put("passToNurse", false); + + // Empty objects/arrays + result.put("m_title", new HashMap<>()); + result.put("maritalStatus", new HashMap<>()); result.put("beneficiaryIdentities", new ArrayList<>()); - + } catch (Exception e) { logger.error("Error mapping ES result: {}", e.getMessage(), e); return null; } - + return result; } - + /** * Direct database search as fallback */ private List> searchInDatabaseDirectly(String query) { try { List results = benDetailRepo.searchBeneficiaries(query); - + return results.stream() - .map(this::mapToExpectedFormat) - .collect(Collectors.toList()); - + .map(this::mapToExpectedFormat) + .collect(Collectors.toList()); + } catch (Exception e) { logger.error("Database search failed: {}", e.getMessage(), e); return Collections.emptyList(); } } - + /** * Map database result to expected API format */ private Map mapToExpectedFormat(Object[] row) { Map result = new HashMap<>(); - + try { Long beneficiaryRegID = getLong(row[0]); String beneficiaryID = getString(row[1]); @@ -753,7 +727,7 @@ private Map mapToExpectedFormat(Object[] row) { Date createdDate = getDate(row[12]); Long lastModDate = getLong(row[13]); Long benAccountID = getLong(row[14]); - + Integer stateID = getInteger(row[15]); String stateName = getString(row[16]); Integer districtID = getInteger(row[17]); @@ -765,28 +739,38 @@ private Map mapToExpectedFormat(Object[] row) { String servicePointName = getString(row[23]); Integer parkingPlaceID = getInteger(row[24]); String phoneNum = getString(row[25]); - + + Integer villageID = getInteger(row[26]); + String villageName = getString(row[27]); + result.put("beneficiaryRegID", beneficiaryRegID); result.put("beneficiaryID", beneficiaryID); result.put("firstName", firstName); - result.put("lastName", lastName); + result.put("lastName", lastName != null ? lastName : ""); result.put("genderID", genderID); result.put("genderName", genderName); result.put("dOB", dob); result.put("dob", dob); result.put("age", age); + result.put("actualAge", age); // NEW + result.put("ageUnits", "Years"); // NEW result.put("fatherName", fatherName != null ? fatherName : ""); result.put("spouseName", spouseName != null ? spouseName : ""); + result.put("isHIVPos", isHIVPos != null ? isHIVPos : ""); result.put("createdBy", createdBy); result.put("createdDate", createdDate); result.put("lastModDate", lastModDate); result.put("benAccountID", benAccountID); - + + // NEW fields + result.put("maritalStatusName", null); + result.put("maritalStatus", new HashMap<>()); + Map mGender = new HashMap<>(); mGender.put("genderID", genderID); mGender.put("genderName", genderName); result.put("m_gender", mGender); - + Map demographics = new HashMap<>(); demographics.put("beneficiaryRegID", beneficiaryRegID); demographics.put("stateID", stateID); @@ -795,45 +779,47 @@ private Map mapToExpectedFormat(Object[] row) { demographics.put("districtName", districtName); demographics.put("blockID", blockID); demographics.put("blockName", blockName); - demographics.put("districtBranchID", null); - demographics.put("districtBranchName", null); + demographics.put("villageID", villageID); + demographics.put("villageName", villageName); + demographics.put("districtBranchID", villageID); + demographics.put("districtBranchName", villageName); demographics.put("parkingPlaceID", parkingPlaceID); demographics.put("servicePointID", servicePointID); demographics.put("servicePointName", servicePointName); demographics.put("createdBy", createdBy); - + Map mState = new HashMap<>(); mState.put("stateID", stateID); mState.put("stateName", stateName); mState.put("stateCode", null); mState.put("countryID", 1); demographics.put("m_state", mState); - + Map mDistrict = new HashMap<>(); mDistrict.put("districtID", districtID); mDistrict.put("districtName", districtName); mDistrict.put("stateID", stateID); demographics.put("m_district", mDistrict); - + Map mBlock = new HashMap<>(); mBlock.put("blockID", blockID); mBlock.put("blockName", blockName); mBlock.put("districtID", districtID); mBlock.put("stateID", stateID); demographics.put("m_districtblock", mBlock); - + Map mBranch = new HashMap<>(); mBranch.put("districtBranchID", null); mBranch.put("blockID", blockID); - mBranch.put("villageName", null); + mBranch.put("villageName", villageName); // FIXED mBranch.put("pinCode", pinCode); demographics.put("m_districtbranchmapping", mBranch); - + result.put("i_bendemographics", demographics); - + List> benPhoneMaps = fetchPhoneNumbers(beneficiaryRegID); result.put("benPhoneMaps", benPhoneMaps); - + result.put("isConsent", false); result.put("m_title", new HashMap<>()); result.put("maritalStatus", new HashMap<>()); @@ -850,28 +836,28 @@ private Map mapToExpectedFormat(Object[] row) { result.put("emergencyRegistration", false); result.put("passToNurse", false); result.put("beneficiaryIdentities", new ArrayList<>()); - + } catch (Exception e) { logger.error("Error mapping result: {}", e.getMessage(), e); } - + return result; } - + /** * Fetch phone numbers for a beneficiary */ private List> fetchPhoneNumbers(Long beneficiaryRegID) { List> phoneList = new ArrayList<>(); - + try { List phones = benDetailRepo.findPhoneNumbersByBeneficiaryId(beneficiaryRegID); - + int mapId = 1; for (Object[] phone : phones) { String phoneNo = getString(phone[0]); String phoneType = getString(phone[1]); - + if (phoneNo != null && !phoneNo.isEmpty()) { Map phoneMap = new HashMap<>(); phoneMap.put("benPhMapID", (long) mapId++); @@ -879,33 +865,38 @@ private List> fetchPhoneNumbers(Long beneficiaryRegID) { phoneMap.put("parentBenRegID", beneficiaryRegID); phoneMap.put("benRelationshipID", 1); phoneMap.put("phoneNo", phoneNo); - + Map relationType = new HashMap<>(); relationType.put("benRelationshipID", 1); relationType.put("benRelationshipType", phoneType != null ? phoneType : "Self"); phoneMap.put("benRelationshipType", relationType); - + phoneList.add(phoneMap); } } } catch (Exception e) { logger.error("Error fetching phone numbers: {}", e.getMessage(), e); } - + return phoneList; } - + // Helper methods private String getString(Object value) { - if (value == null) return null; + if (value == null) + return null; return value.toString(); } - + private Long getLong(Object value) { - if (value == null) return null; - if (value instanceof Long) return (Long) value; - if (value instanceof Integer) return ((Integer) value).longValue(); - if (value instanceof BigDecimal) return ((BigDecimal) value).longValue(); + if (value == null) + return null; + if (value instanceof Long) + return (Long) value; + if (value instanceof Integer) + return ((Integer) value).longValue(); + if (value instanceof BigDecimal) + return ((BigDecimal) value).longValue(); if (value instanceof String) { try { return Long.parseLong((String) value); @@ -915,12 +906,16 @@ private Long getLong(Object value) { } return null; } - + private Integer getInteger(Object value) { - if (value == null) return null; - if (value instanceof Integer) return (Integer) value; - if (value instanceof Long) return ((Long) value).intValue(); - if (value instanceof BigDecimal) return ((BigDecimal) value).intValue(); + if (value == null) + return null; + if (value instanceof Integer) + return (Integer) value; + if (value instanceof Long) + return ((Long) value).intValue(); + if (value instanceof BigDecimal) + return ((BigDecimal) value).intValue(); if (value instanceof String) { try { return Integer.parseInt((String) value); @@ -930,12 +925,16 @@ private Integer getInteger(Object value) { } return null; } - + private Date getDate(Object value) { - if (value == null) return null; - if (value instanceof Date) return (Date) value; - if (value instanceof Timestamp) return new Date(((Timestamp) value).getTime()); - if (value instanceof java.sql.Date) return new Date(((java.sql.Date) value).getTime()); + if (value == null) + return null; + if (value instanceof Date) + return (Date) value; + if (value instanceof Timestamp) + return new Date(((Timestamp) value).getTime()); + if (value instanceof java.sql.Date) + return new Date(((java.sql.Date) value).getTime()); return null; } } \ No newline at end of file diff --git a/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchSyncService.java b/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchSyncService.java index 8f40e37..0f80c12 100644 --- a/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchSyncService.java +++ b/src/main/java/com/iemr/common/identity/service/elasticsearch/ElasticsearchSyncService.java @@ -12,15 +12,12 @@ import org.springframework.stereotype.Service; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.Refresh; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import com.iemr.common.identity.data.elasticsearch.BeneficiaryDocument; -import com.iemr.common.identity.dto.BenDetailDTO; -import com.iemr.common.identity.dto.BeneficiariesDTO; -import com.iemr.common.identity.repo.BenMappingRepo; -import com.iemr.common.identity.service.elasticsearch.BeneficiaryDataService; /** * Service to synchronize beneficiary data from database to Elasticsearch @@ -29,36 +26,35 @@ public class ElasticsearchSyncService { private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSyncService.class); - private static final int BATCH_SIZE = 100; // Reduced to 100 for better connection management - private static final int ES_BULK_SIZE = 50; // Reduced to 50 for better performance - private static final int PAUSE_AFTER_BATCHES = 5; // Pause after every 5 batches - @Autowired - private ElasticsearchClient esClient; + // OPTIMIZED BATCH SIZES for maximum performance + private static final int DB_FETCH_SIZE = 5000; // Fetch 5000 IDs at once + private static final int ES_BULK_SIZE = 2000; // Index 2000 documents per bulk request @Autowired - private BenMappingRepo mappingRepo; + private ElasticsearchClient esClient; @Autowired - private BeneficiaryDataService beneficiaryDataService; + private BeneficiaryTransactionHelper transactionalWrapper; @Autowired - private BeneficiaryTransactionHelper transactionalWrapper; + private BeneficiaryDocumentDataService documentDataService; // KEY: Batch service with ABHA @Value("${elasticsearch.index.beneficiary}") private String beneficiaryIndex; /** - * Sync all beneficiaries from database to Elasticsearch - * This should be run as a one-time operation or scheduled job + * Sync all beneficiaries using BATCH queries WITH ABHA + * This replaces individual queries with batch fetching (50-100x faster) */ public SyncResult syncAllBeneficiaries() { - logger.info("Starting full beneficiary sync to Elasticsearch..."); + logger.info("STARTING OPTIMIZED BATCH SYNC WITH ABHA"); SyncResult result = new SyncResult(); + long startTime = System.currentTimeMillis(); try { - // Get total count using transactional wrapper + // Get total count long totalCount = transactionalWrapper.countActiveBeneficiaries(); logger.info("Total beneficiaries to sync: {}", totalCount); @@ -68,104 +64,77 @@ public SyncResult syncAllBeneficiaries() { } AtomicInteger processedCount = new AtomicInteger(0); + AtomicInteger abhaEnrichedCount = new AtomicInteger(0); int offset = 0; - int batchCounter = 0; - List esBatch = new ArrayList<>(); + List esBatch = new ArrayList<>(ES_BULK_SIZE); - // Process in batches + // Process in large chunks for maximum speed while (offset < totalCount) { - logger.info("Fetching batch: offset={}, limit={}", offset, BATCH_SIZE); - - List batchIds = null; + long chunkStart = System.currentTimeMillis(); - try { - // Use transactional wrapper to get fresh connection for each batch - batchIds = transactionalWrapper.getBeneficiaryIdsBatch(offset, BATCH_SIZE); - } catch (Exception e) { - logger.error("Error fetching batch from database: {}", e.getMessage()); - // Wait and retry once - try { - Thread.sleep(2000); - batchIds = transactionalWrapper.getBeneficiaryIdsBatch(offset, BATCH_SIZE); - } catch (Exception e2) { - logger.error("Retry failed: {}", e2.getMessage()); - result.setError("Database connection error: " + e2.getMessage()); - break; - } - } + // STEP 1: Fetch IDs in batch + List batchIds = fetchBatchWithRetry(offset, DB_FETCH_SIZE); if (batchIds == null || batchIds.isEmpty()) { - logger.info("No more records to process. Breaking loop."); + logger.info("No more records to process."); break; } - logger.info("Processing {} beneficiaries in current batch", batchIds.size()); + logger.info("Fetched {} IDs at offset {}", batchIds.size(), offset); - for (Object[] benIdObj : batchIds) { - try { - - Object idObj = benIdObj[0]; - BigInteger benRegId; + // STEP 2: Convert IDs to BigInteger list + List benRegIds = new ArrayList<>(batchIds.size()); + for (Object[] idRow : batchIds) { + BigInteger benRegId = convertToBigInteger(idRow[0]); + if (benRegId != null) { + benRegIds.add(benRegId); + } + } - if (idObj instanceof BigInteger) { - benRegId = (BigInteger) idObj; - } else if (idObj instanceof Long) { - benRegId = BigInteger.valueOf((Long) idObj); - } else { - throw new IllegalArgumentException( - "Unsupported benRegId type: " + idObj.getClass()); + // STEP 3: BATCH FETCH complete data WITH ABHA (CRITICAL OPTIMIZATION) + // This single call replaces thousands of individual database queries + logger.info("Batch fetching complete data with ABHA for {} beneficiaries...", benRegIds.size()); + List documents = documentDataService.getBeneficiariesBatch(benRegIds); + logger.info("Retrieved {} complete documents", documents.size()); + + // STEP 4: Count ABHA enriched documents and add to ES batch + for (BeneficiaryDocument doc : documents) { + if (doc != null && doc.getBenId() != null) { + + // Track ABHA enrichment + if (doc.getHealthID() != null || doc.getAbhaID() != null) { + abhaEnrichedCount.incrementAndGet(); + logger.debug("Document {} has ABHA: healthID={}, abhaID={}", + doc.getBenId(), doc.getHealthID(), doc.getAbhaID()); } - // Fetch beneficiary details DIRECTLY from database - BeneficiariesDTO benDTO = beneficiaryDataService.getBeneficiaryFromDatabase(benRegId); - - if (benDTO != null) { - BeneficiaryDocument doc = convertToDocument(benDTO); - - if (doc != null && doc.getBenId() != null) { - esBatch.add(doc); - - // Send to ES when batch is full - if (esBatch.size() >= ES_BULK_SIZE) { - int indexed = bulkIndexDocuments(esBatch); - result.addSuccess(indexed); - result.addFailure(esBatch.size() - indexed); - - int current = processedCount.addAndGet(esBatch.size()); - logger.info("Progress: {}/{} ({} %) - Indexed: {}, Failed: {}", - current, totalCount, - String.format("%.2f", (current * 100.0) / totalCount), - indexed, esBatch.size() - indexed); - - esBatch.clear(); - } - } else { - logger.warn("Skipping beneficiary with null benId: benRegId={}", benRegId); - result.addFailure(); - } - } else { - logger.warn("No details found for benRegId: {}", benRegId); - result.addFailure(); - } + esBatch.add(doc); + + // Bulk index when batch is full + if (esBatch.size() >= ES_BULK_SIZE) { + int indexed = bulkIndexDocuments(esBatch); + result.addSuccess(indexed); + result.addFailure(esBatch.size() - indexed); + + int current = processedCount.addAndGet(esBatch.size()); + logProgress(current, totalCount, abhaEnrichedCount.get(), startTime); - } catch (Exception e) { - logger.error("Error processing beneficiary in batch: {}", e.getMessage(), e); + esBatch.clear(); + } + } else { result.addFailure(); } } - offset += BATCH_SIZE; - batchCounter++; + long chunkTime = System.currentTimeMillis() - chunkStart; + logger.info("Chunk processed in {}ms ({} docs/sec)", + chunkTime, + (batchIds.size() * 1000) / Math.max(chunkTime, 1)); - // Pause after every N batches to let connections stabilize - if (batchCounter % PAUSE_AFTER_BATCHES == 0) { - logger.info("Completed {} batches. Pausing for 2 seconds...", batchCounter); - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - logger.warn("Sleep interrupted: {}", e.getMessage()); - } - } + offset += DB_FETCH_SIZE; + + // Brief pause to prevent overwhelming the system + Thread.sleep(50); } // Index remaining documents @@ -177,15 +146,21 @@ public SyncResult syncAllBeneficiaries() { processedCount.addAndGet(esBatch.size()); } - logger.info("Sync completed successfully!"); + long totalTime = System.currentTimeMillis() - startTime; + double docsPerSecond = (processedCount.get() * 1000.0) / totalTime; + + logger.info("SYNC COMPLETED SUCCESSFULLY!"); logger.info("Total Processed: {}", processedCount.get()); logger.info("Successfully Indexed: {}", result.getSuccessCount()); logger.info("Failed: {}", result.getFailureCount()); + logger.info("ABHA Enriched: {} ({} %)", + abhaEnrichedCount.get(), + String.format("%.2f", (abhaEnrichedCount.get() * 100.0) / processedCount.get())); + logger.info("Total Time: {} seconds ({} minutes)", totalTime / 1000, totalTime / 60000); + logger.info("Throughput: {:.2f} documents/second", docsPerSecond); } catch (Exception e) { - logger.error("========================================"); - logger.error("CRITICAL ERROR during full sync: {}", e.getMessage(), e); - logger.error("========================================"); + logger.error("CRITICAL ERROR during sync: {}", e.getMessage(), e); result.setError(e.getMessage()); } @@ -193,64 +168,61 @@ public SyncResult syncAllBeneficiaries() { } /** - * Sync a single beneficiary by BenRegId - * Uses direct database access to avoid Elasticsearch circular dependency + * Fetch batch with automatic retry on connection errors */ - public boolean syncSingleBeneficiary(String benRegId) { - try { - - BigInteger benRegIdBig = new BigInteger(benRegId); - - // Check if beneficiary exists in database first using transactional wrapper - boolean exists = transactionalWrapper.existsByBenRegId(benRegIdBig); - if (!exists) { - logger.error("Beneficiary does not exist in database}"); - return false; - } - - logger.info("Beneficiary exists in database. Fetching details..."); - - // Get beneficiary DIRECTLY from database (not through IdentityService) - BeneficiariesDTO benDTO = beneficiaryDataService.getBeneficiaryFromDatabase(benRegIdBig); - - if (benDTO == null) { - logger.error("Failed to fetch beneficiary details from database"); - return false; - } - - logger.info("Beneficiary details fetched successfully"); - logger.info("BenRegId: {}, Name: {} {}", - benDTO.getBenRegId(), - benDTO.getBeneficiaryDetails() != null ? benDTO.getBeneficiaryDetails().getFirstName() : "N/A", - benDTO.getBeneficiaryDetails() != null ? benDTO.getBeneficiaryDetails().getLastName() : "N/A"); - - // Convert to Elasticsearch document - BeneficiaryDocument doc = convertToDocument(benDTO); - - if (doc == null || doc.getBenId() == null) { - logger.error("Failed to convert beneficiary to document"); - return false; + private List fetchBatchWithRetry(int offset, int limit) { + int maxRetries = 3; + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + return transactionalWrapper.getBeneficiaryIdsBatch(offset, limit); + } catch (Exception e) { + logger.warn("Database fetch error (attempt {}/{}): {}", attempt, maxRetries, e.getMessage()); + if (attempt == maxRetries) { + throw new RuntimeException("Failed to fetch batch after " + maxRetries + " attempts", e); + } + try { + Thread.sleep(1000 * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during retry", ie); + } } + } + return null; + } - logger.info("Document created. Indexing to Elasticsearch..."); - logger.info("Document ID: {}, Index: {}", doc.getBenId(), beneficiaryIndex); - - // Index to Elasticsearch - esClient.index(i -> i - .index(beneficiaryIndex) - .id(doc.getBenId()) - .document(doc)); - - logger.info("SUCCESS! Beneficiary synced to Elasticsearch"); - - return true; - - } catch (Exception e) { - logger.error("ERROR syncing beneficiary {}: {}", benRegId, e.getMessage(), e); - return false; + /** + * Convert various ID types to BigInteger + */ + private BigInteger convertToBigInteger(Object idObj) { + if (idObj instanceof BigInteger) { + return (BigInteger) idObj; + } else if (idObj instanceof Long) { + return BigInteger.valueOf((Long) idObj); + } else if (idObj instanceof Integer) { + return BigInteger.valueOf((Integer) idObj); + } else { + logger.warn("Unsupported ID type: {}", idObj != null ? idObj.getClass() : "null"); + return null; } } + /** + * Progress logging with ETA and ABHA count + */ + private void logProgress(int current, long total, int abhaCount, long startTime) { + double progress = (current * 100.0) / total; + long elapsed = System.currentTimeMillis() - startTime; + long estimatedTotal = (long) (elapsed / (progress / 100.0)); + long remaining = estimatedTotal - elapsed; + + logger.info("Progress: {}/{} ({:.2f}%) | ABHA: {} | Elapsed: {}m | ETA: {}m | Speed: {:.0f} docs/sec", + current, total, progress, abhaCount, + elapsed / 60000, + remaining / 60000, + (current * 1000.0) / elapsed); + } + /** * Bulk index documents to Elasticsearch */ @@ -277,11 +249,9 @@ private int bulkIndexDocuments(List documents) { int successCount = 0; if (result.errors()) { - logger.warn("Bulk indexing had some errors"); for (BulkResponseItem item : result.items()) { if (item.error() != null) { - logger.error("Error indexing document {}: {}", - item.id(), item.error().reason()); + logger.error("Error indexing document {}: {}", item.id(), item.error().reason()); } else { successCount++; } @@ -299,62 +269,54 @@ private int bulkIndexDocuments(List documents) { } /** - * Convert BeneficiariesDTO to BeneficiaryDocument + * Sync a single beneficiary with ABHA */ - private BeneficiaryDocument convertToDocument(BeneficiariesDTO dto) { - if (dto == null) { - logger.warn("Cannot convert null DTO to document"); - return null; - } - + public boolean syncSingleBeneficiary(String benRegId) { try { - BeneficiaryDocument doc = new BeneficiaryDocument(); - - // BenId (use benRegId as primary identifier) - if (dto.getBenRegId() != null) { - BigInteger benRegId = (BigInteger) dto.getBenRegId(); - doc.setBenId(benRegId.toString()); - doc.setBenRegId(benRegId.longValue()); - } else if (dto.getBenId() != null) { - doc.setBenId(dto.getBenId().toString()); - if (dto.getBenId() instanceof BigInteger) { - doc.setBenRegId(((BigInteger) dto.getBenId()).longValue()); - } - } else { - logger.warn("Beneficiary has no valid ID!"); - return null; + BigInteger benRegIdBig = new BigInteger(benRegId); + + // Check existence + boolean exists = transactionalWrapper.existsByBenRegId(benRegIdBig); + if (!exists) { + logger.error("Beneficiary does not exist in database: {}", benRegId); + return false; } - // Phone number - doc.setPhoneNum(dto.getPreferredPhoneNum()); + logger.info("Beneficiary exists in database. Fetching details with ABHA..."); - // Beneficiary Details (from nested DTO) - if (dto.getBeneficiaryDetails() != null) { - BenDetailDTO benDetails = dto.getBeneficiaryDetails(); - doc.setFirstName(benDetails.getFirstName()); - doc.setLastName(benDetails.getLastName()); - doc.setAge(benDetails.getBeneficiaryAge()); - doc.setGender(benDetails.getGender()); + // Fetch document using batch service (includes ABHA) + BeneficiaryDocument doc = documentDataService.getBeneficiaryFromDatabase(benRegIdBig); + + if (doc == null || doc.getBenId() == null) { + logger.error("Failed to fetch beneficiary document"); + return false; } - logger.debug("Successfully converted DTO to document: benId={}", doc.getBenId()); - return doc; + logger.info("Document created with ABHA. healthID={}, abhaID={}", + doc.getHealthID(), doc.getAbhaID()); + + // Index to Elasticsearch + esClient.index(i -> i + .index(beneficiaryIndex) + .id(doc.getBenId()) + .document(doc).refresh(Refresh.True)); + + logger.info("SUCCESS! Beneficiary {} synced to Elasticsearch with ABHA", benRegId); + return true; } catch (Exception e) { - logger.error("Error converting DTO to document: {}", e.getMessage(), e); - return null; + logger.error("ERROR syncing beneficiary {}: {}", benRegId, e.getMessage(), e); + return false; } } /** - * Verify sync by checking document count + * Check sync status */ public SyncStatus checkSyncStatus() { try { long dbCount = transactionalWrapper.countActiveBeneficiaries(); - - long esCount = esClient.count(c -> c - .index(beneficiaryIndex)).count(); + long esCount = esClient.count(c -> c.index(beneficiaryIndex)).count(); SyncStatus status = new SyncStatus(); status.setDatabaseCount(dbCount); @@ -363,7 +325,6 @@ public SyncStatus checkSyncStatus() { status.setMissingCount(dbCount - esCount); logger.info("Sync Status - DB: {}, ES: {}, Missing: {}", dbCount, esCount, dbCount - esCount); - return status; } catch (Exception e) { @@ -412,17 +373,11 @@ public void setError(String error) { @Override public String toString() { - return "SyncResult{" + - "successCount=" + successCount + - ", failureCount=" + failureCount + - ", error='" + error + '\'' + - '}'; + return "SyncResult{successCount=" + successCount + ", failureCount=" + failureCount + + ", error='" + error + "'}"; } } - /** - * Status class to track sync verification - */ public static class SyncStatus { private long databaseCount; private long elasticsearchCount; @@ -472,13 +427,8 @@ public void setError(String error) { @Override public String toString() { - return "SyncStatus{" + - "databaseCount=" + databaseCount + - ", elasticsearchCount=" + elasticsearchCount + - ", synced=" + synced + - ", missingCount=" + missingCount + - ", error='" + error + '\'' + - '}'; + return "SyncStatus{databaseCount=" + databaseCount + ", elasticsearchCount=" + elasticsearchCount + + ", synced=" + synced + ", missingCount=" + missingCount + ", error='" + error + "'}"; } } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 22a9ff6..5001e41 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -21,28 +21,33 @@ spring.datasource.hikari.data-source-properties.tcpKeepAlive=true # spring.datasource.hikari.test-on-borrow=true # Reduce batch size for faster feedback -spring.jpa.properties.hibernate.jdbc.batch_size=50 +spring.jpa.properties.hibernate.jdbc.fetch_size=10000 +spring.jpa.properties.hibernate.jdbc.batch_size=5000 spring.jpa.properties.hibernate.order_inserts=true spring.jpa.properties.hibernate.order_updates=true # Query timeout -spring.jpa.properties.hibernate.query.timeout=30000 +spring.jpa.properties.hibernate.query.timeout=60000 -spring.datasource.hikari.maximum-pool-size=20 -spring.datasource.hikari.minimum-idle=10 +spring.datasource.hikari.maximum-pool-size=30 +spring.datasource.hikari.minimum-idle=15 spring.datasource.hikari.connection-timeout=30000 spring.datasource.hikari.idle-timeout=300000 spring.datasource.hikari.max-lifetime=600000 -spring.datasource.hikari.keepalive-tquickime=120000 -spring.datasource.hikari.validationTimeout=5000 +spring.datasource.hikari.keepalive-time=120000 +spring.datasource.hikari.validation-timeout=5000 +spring.datasource.hikari.leak-detection-threshold=60000 -spring.datasource.hikari.health-check-properties.mysql5Validation=true -spring.datasource.hikari.data-source-properties.cachePrepStmts=true -spring.datasource.hikari.data-source-properties.prepStmtCacheSize=250 -spring.datasource.hikari.data-source-properties.prepStmtCacheSqlLimit=2048 +ring.datasource.hikari.data-source-properties.cachePrepStmts=true +spring.datasource.hikari.data-source-properties.prepStmtCacheSize=500 +spring.datasource.hikari.data-source-properties.prepStmtCacheSqlLimit=4096 spring.datasource.hikari.data-source-properties.useServerPrepStmts=true +spring.datasource.hikari.data-source-properties.autoReconnect=true spring.datasource.hikari.data-source-properties.tcpKeepAlive=true +spring.datasource.hikari.data-source-properties.useSSL=false + +spring.datasource.hikari.health-check-properties.mysql5Validation=true #Below lines are added for security reasons spring.session.store-type=redis @@ -75,11 +80,47 @@ spring.main.allow-circular-references=true jwt.access.expiration=86400000 jwt.refresh.expiration=604800000 -# Connection pool settings -elasticsearch.connection.timeout=5000 -elasticsearch.socket.timeout=60000 +# Connection timeouts +elasticsearch.connection.timeout=10000 +elasticsearch.socket.timeout=120000 +elasticsearch.max.retry.timeout=120000 + +# Connection pooling - INCREASED for bulk operations +elasticsearch.max.connections=200 +elasticsearch.max.connections.per.route=100 + +# Request configuration +elasticsearch.request.timeout=60000 +elasticsearch.max.result.window=10000 + +# Bulk indexing - OPTIMIZED for maximum throughput +elasticsearch.bulk.size=2000 +elasticsearch.bulk.concurrent.requests=8 +elasticsearch.bulk.flush.interval=30s + +# Search performance (unchanged - maintains fast search) +elasticsearch.search.default.size=100 +elasticsearch.search.max.size=500 +elasticsearch.search.timeout=5s + +# Query cache (for search performance) +elasticsearch.query.cache.enabled=true +elasticsearch.query.cache.size=10% + +# Request cache +elasticsearch.request.cache.enabled=true + +# Circuit breaker +elasticsearch.circuit.breaker.enabled=true +elasticsearch.circuit.breaker.limit=95% + +# Async operations thread pool +elasticsearch.async.thread.pool.size=20 +elasticsearch.async.thread.pool.queue.size=5000 -# Logging +# LOGGING (Reduce noise during sync) logging.level.com.iemr.common.identity.service.elasticsearch=INFO logging.level.org.elasticsearch=WARN +logging.level.org.hibernate.SQL=WARN +logging.level.com.zaxxer.hikari=INFO