Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #807

Merged
merged 18 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

@Configuration
//@Configuration
public class KafkaConfiguration {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,7 @@ public class WSCalculationConfiguration {

@Value("${egov.update.demand.add.penalty}")
private String updateAddPenaltytopic;

@Value("${ws.generate.demand.bulk}")
private String wsGenerateDemandBulktopic;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class BillingNotificationConsumer {
*/
@KafkaListener(topics = { "${kafka.topics.billgen.topic}" })
public void listen(final HashMap<String, Object> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Consuming record: " + record);
// paymentService.process(record, topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void listen(final List<Message<?>> records) {
* @param records failed batch processing
*/
@KafkaListener(topics = {
"${persister.demand.based.dead.letter.topic.batch}" }, containerFactory = "kafkaListenerContainerFactory")
"${persister.demand.based.dead.letter.topic.batch}" })
public void listenDeadLetterTopic(final List<Message<?>> records) {
CalculationReq calculationReq = mapper.convertValue(records.get(0).getPayload(), CalculationReq.class);
Map<String, Object> masterMap = mDataService.loadMasterData(calculationReq.getRequestInfo(),
Expand Down Expand Up @@ -192,12 +192,12 @@ private void generateDemandInBatch(CalculationReq request, Map<String, Object> m
wsCalulationWorkflowValidator.applicationValidation(request.getRequestInfo(), criteria.getTenantId(),
criteria.getConnectionNo(), genratedemand);
}
System.out.println("Calling Bulk Demand generation");
System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo());
wSCalculationServiceImpl.bulkDemandGeneration(request, masterMap);
String connectionNoStrings = request.getCalculationCriteria().stream()
/*String connectionNoStrings = request.getCalculationCriteria().stream()
.map(criteria -> criteria.getConnectionNo()).collect(Collectors.toSet()).toString();
StringBuilder str = new StringBuilder("Demand generated Successfully. For records : ")
.append(connectionNoStrings);
.append(connectionNoStrings);*/
// producer.push(errorTopic, request);
// remove the try catch or throw the exception to the previous method to catch it.

Expand All @@ -208,7 +208,7 @@ private void generateDemandInBatch(CalculationReq request, Map<String, Object> m
* @param tenantId TenantId for getting master data.
*/
@KafkaListener(topics = {
"${egov.wscal.bulk.demand.schedular.topic}" }, containerFactory = "kafkaListenerContainerFactory")
"${egov.wscal.bulk.demand.schedular.topic}" })
public void generateDemandForTenantId(HashMap<Object, Object> messageData) {
String tenantId;
RequestInfo requestInfo;
Expand Down Expand Up @@ -272,8 +272,6 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t

List<String> connectionNos = waterCalculatorDao.getNonMeterConnectionsList(tenantId, dayStartTime, dayEndTime);

List<String> meteredConnectionNos = waterCalculatorDao.getConnectionsNoList(tenantId,
WSCalculationConstant.meteredConnectionType);


Calendar previousFromDate = Calendar.getInstance();
Expand All @@ -286,11 +284,16 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
previousToDate.add(Calendar.MONTH, -1);
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH);
previousToDate.set(Calendar.DAY_OF_MONTH, max);
Map<String, Object> masterMap = mDataService.loadMasterData(requestInfo,
tenantId);

String assessmentYear = estimationService.getAssessmentYear();
ArrayList<String> failedConnectionNos = new ArrayList<String>();
Map<String, Object> masterMap = mDataService.loadMasterData(requestInfo,
tenantId);

log.info("connectionNos" + connectionNos.size());
log.info("connectionNos" + connectionNos);
log.info("dayStartTime:"+dayStartTime);
log.info("dayEndTime"+dayEndTime);

for (String connectionNo : connectionNos) {
CalculationCriteria calculationCriteria = CalculationCriteria.builder().tenantId(tenantId)
.assessmentYear(assessmentYear).connectionNo(connectionNo).from(dayStartTime).to(dayEndTime).build();
Expand All @@ -310,26 +313,58 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
failedConnectionNos.add(connectionNo);
continue;
}

HashMap<Object, Object> genarateDemandData = new HashMap<Object, Object>();
genarateDemandData.put("calculationReq", calculationReq);
genarateDemandData.put("billingCycle",billingCycle);
genarateDemandData.put("masterMap",masterMap);
genarateDemandData.put("isSendMessage",isSendMessage);
genarateDemandData.put("tenantId",tenantId);

/*
* List<Demand> demands = demandService.searchDemand(tenantId, consumerCodes,
* previousFromDate.getTimeInMillis(), previousToDate.getTimeInMillis(),
* requestInfo); if (demands != null && demands.size() == 0) {
* log.warn("this connection doen't have the demand in previous billing cycle :"
* + connectionNo ); continue; }
*/
try {
if(!tenantId.equals(config.getSmsExcludeTenant())) {
generateDemandInBatch(calculationReq, masterMap, billingCycle, isSendMessage);
}
log.info("sending generate demand for connection no :"+connectionNo);
producer.push(config.getWsGenerateDemandBulktopic(),genarateDemandData);

} catch (Exception e) {
System.out.println("Got the exception while genating the demands:" + e);
failedConnectionNos.add(connectionNo);
}
HashMap<String, String> demandMessage = util.getLocalizationMessage(requestInfo,
WSCalculationConstant.mGram_Consumer_NewDemand, tenantId);
HashMap<String, String> gpwscMap = util.getLocalizationMessage(requestInfo, tenantId, tenantId);
UserDetailResponse userDetailResponse = userService.getUserByRoleCodes(requestInfo,
Arrays.asList("COLLECTION_OPERATOR"), tenantId);
Map<String, String> mobileNumberIdMap = new LinkedHashMap<>();
String msgLink = config.getNotificationUrl() + config.getGpUserDemandLink();
for (OwnerInfo userInfo : userDetailResponse.getUser()) {
if (userInfo.getName() != null) {
mobileNumberIdMap.put(userInfo.getMobileNumber(), userInfo.getName());
} else {
mobileNumberIdMap.put(userInfo.getMobileNumber(), userInfo.getUserName());
}
}
System.out.println("demand Failed event Messages to the GP users ");
if (isSendMessage && failedConnectionNos.size() > 0) {
mobileNumberIdMap.entrySet().stream().forEach(map -> {
String msg = demandMessage.get(WSCalculationConstant.MSG_KEY);
msg = msg.replace("{ownername}", map.getValue());
msg = msg.replace("{villagename}",
(gpwscMap != null && !StringUtils.isEmpty(gpwscMap.get(WSCalculationConstant.MSG_KEY)))
? gpwscMap.get(WSCalculationConstant.MSG_KEY)
: tenantId);
msg = msg.replace("{billingcycle}", billingCycle);
msg = msg.replace("{LINK}", msgLink);
if(!map.getKey().equals(config.getPspclVendorNumber())) {
SMSRequest smsRequest = SMSRequest.builder().mobileNumber(map.getKey()).message(msg)
.tenantid(tenantId)
.category(Category.TRANSACTION).build();
if(config.isSmsForDemandEnable()) {
producer.push(config.getSmsNotifTopic(), smsRequest);
}
}

});
/* if (isSendMessage && failedConnectionNos.size() > 0) {
List<ActionItem> actionItems = new ArrayList<>();
String actionLink = config.getBulkDemandFailedLink();
ActionItem actionItem = ActionItem.builder().actionUrl(actionLink).build();
Expand All @@ -344,7 +379,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
WSCalculationConstant.GENERATE_DEMAND_EVENT, tenantId);
String messages = failedMessage.get(WSCalculationConstant.MSG_KEY);
messages = messages.replace("{BILLING_CYCLE}", LocalDate.now().getMonth().toString());

additionals.put("localizationCode", WSCalculationConstant.GENERATE_DEMAND_EVENT);
HashMap<String, String> attributes = new HashMap<String, String>();
attributes.put("{BILLING_CYCLE}", LocalDate.now().getMonth().toString());
Expand Down Expand Up @@ -477,7 +512,24 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
}

});
}*/
}

public void generateDemandInBulk(CalculationReq calculationReq, String billingCycle, Map<String, Object> masterMap,
boolean isSendMessage,String tenantId) {
log.info("masterMap:"+masterMap);
try {
if(!tenantId.equals(config.getSmsExcludeTenant())) {
generateDemandInBatch(calculationReq, masterMap, billingCycle, isSendMessage);
}

} catch (Exception e) {
e.printStackTrace();
System.out.println("Got the exception while genating the demands:" + e);
log.info("Errro in Apllication no :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());

}

}

/**
Expand Down Expand Up @@ -513,9 +565,8 @@ private Recipient getRecepient(RequestInfo requestInfo, String tenantId) {

@SuppressWarnings("unchecked")
@KafkaListener(topics = {
"${egov.generate.bulk.demand.manually.topic}" }, containerFactory = "kafkaListenerContainerFactory")
"${egov.generate.bulk.demand.manually.topic}" })
public void generateBulkDemandForULB(HashMap<Object, Object> messageData) {
log.info("Billing master data values for non metered connection:: {}", messageData);
Map<String, Object> billingMasterData;
BulkDemand bulkDemand;
boolean isSendMessage = false;
Expand All @@ -533,10 +584,29 @@ public void generateBulkDemandForULB(HashMap<Object, Object> messageData) {

}
@KafkaListener(topics = {
"${egov.update.demand.add.penalty}" }, containerFactory = "kafkaListenerContainerFactory")
"${egov.update.demand.add.penalty}" })
public void updateAddPenalty(HashMap<Object, Object> messageData) {
DemandRequest demandRequest = mapper.convertValue(messageData, DemandRequest.class);
demandService.updateDemandAddPenalty(demandRequest.getRequestInfo(), demandRequest.getDemands());
}

@KafkaListener(topics = {
"${ws.generate.demand.bulk}" })
public void generateDemandInBulkListner(HashMap<Object, Object> messageData) {
CalculationReq calculationReq= new CalculationReq();
Map<String, Object> masterMap = new HashMap<>();
String billingCycle ;
boolean isSendMessage = true;
String tenantId="";
HashMap<Object, Object> genarateDemandData = (HashMap<Object, Object>) messageData;
masterMap = (Map<String, Object>) genarateDemandData.get("masterMap");
calculationReq = mapper.convertValue(genarateDemandData.get("calculationReq"), CalculationReq.class);
billingCycle= (String) genarateDemandData.get("billingCycle");
isSendMessage= (boolean) genarateDemandData.get("isSendMessage");
tenantId=(String) genarateDemandData.get("tenantId");

log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());
generateDemandInBulk(calculationReq,billingCycle,masterMap,isSendMessage,tenantId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public List<Demand> saveDemand(RequestInfo requestInfo, List<Demand> demands){
StringBuilder url = new StringBuilder(config.getBillingServiceHost());
url.append(config.getDemandCreateEndPoint());
DemandRequest request = new DemandRequest(requestInfo,demands);
log.info("Creating demand for consumer code: "+request.getDemands().get(0).getConsumerCode());
Object result = serviceRequestRepository.fetchResult(url, request);
try{
return mapper.convertValue(result,DemandResponse.class).getDemands();
Expand All @@ -69,6 +70,7 @@ public List<Demand> updateDemand(RequestInfo requestInfo, List<Demand> demands){
StringBuilder url = new StringBuilder(config.getBillingServiceHost());
url.append(config.getDemandUpdateEndPoint());
DemandRequest request = new DemandRequest(requestInfo,demands);
log.info("Updating demand for consumer code: "+request.getDemands().get(0).getConsumerCode());
Object result = serviceRequestRepository.fetchResult(url, request);
try{
return mapper.convertValue(result,DemandResponse.class).getDemands();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ public class ServiceRequestRepository {
public Object fetchResult(StringBuilder uri, Object request) {
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
Object response = null;
log.debug("URI: " + uri);
try {
log.debug("Request: " + mapper.writeValueAsString(request));
response = restTemplate.postForObject(uri.toString(), request, Map.class);
} catch (HttpClientErrorException e) {
log.error("External Service threw an Exception: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public List<MeterReading> searchMeterReadings(MeterReadingSearchCriteria criteri
String query = queryBuilder.getSearchQueryString(criteria, preparedStatement);
if (query == null)
return Collections.emptyList();
log.debug("Query: " + query);
log.debug("Prepared Statement" + preparedStatement.toString());
return jdbcTemplate.query(query, preparedStatement.toArray(), meterReadingRowMapper);
}

Expand All @@ -78,8 +76,6 @@ public List<MeterReading> searchCurrentMeterReadings(MeterReadingSearchCriteria
String query = queryBuilder.getCurrentReadingConnectionQuery(criteria, preparedStatement);
if (query == null)
return Collections.emptyList();
log.debug("Query: " + query);
log.debug("Prepared Statement" + preparedStatement.toString());
return jdbcTemplate.query(query, preparedStatement.toArray(), currentMeterReadingRowMapper);
}

Expand All @@ -94,7 +90,6 @@ public int isMeterReadingConnectionExist(List<String> ids) {
Set<String> connectionIds = new HashSet<>(ids);
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.getNoOfMeterReadingConnectionQuery(connectionIds, preparedStatement);
log.debug("Query: " + query);
return jdbcTemplate.queryForObject(query, preparedStatement.toArray(), Integer.class);
}

Expand All @@ -104,7 +99,6 @@ public ArrayList<String> searchTenantIds() {
String query = queryBuilder.getTenantIdConnectionQuery();
if (query == null)
return tenantIds;
log.debug("Query: " + query);
tenantIds = (ArrayList<String>) jdbcTemplate.queryForList(query, String.class);
return tenantIds;
}
Expand All @@ -117,8 +111,6 @@ public ArrayList<String> searchConnectionNos(String connectionType, String tenan
tenantId);
if (query == null)
return connectionNos;
log.info("Query: " + query);

connectionNos = (ArrayList<String>) jdbcTemplate.query(query, preparedStatement.toArray(),
demandSchedulerRowMapper);
return connectionNos;
Expand All @@ -128,30 +120,26 @@ public ArrayList<String> searchConnectionNos(String connectionType, String tenan
public List<String> getConnectionsNoList(String tenantId, String connectionType) {
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.getConnectionNumberList(tenantId, connectionType, preparedStatement);
log.info("water " + connectionType + " connection list : " + query);
return jdbcTemplate.query(query, preparedStatement.toArray(), demandSchedulerRowMapper);
}

@Override
public List<String> getNonMeterConnectionsList(String tenantId, Long dayStartTime, Long dayEndTime) {
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.getNonMeteredConnectionsList(tenantId, dayStartTime, dayEndTime, preparedStatement);
log.info("water NMconnection list query: " + query);
return jdbcTemplate.query(query, preparedStatement.toArray(), demandSchedulerRowMapper);
}

@Override
public List<String> getTenantId() {
String query = queryBuilder.getDistinctTenantIds();
log.info("Tenant Id's List Query : " + query);
return jdbcTemplate.queryForList(query, String.class);
}

@Override
public int isBillingPeriodExists(String connectionNo, String billingPeriod) {
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.isBillingPeriodExists(connectionNo, billingPeriod, preparedStatement);
log.info("Is BillingPeriod Exits Query: " + query);
return jdbcTemplate.queryForObject(query, preparedStatement.toArray(), Integer.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ public String previousBillingCycleDemandQuery(Set<String> connectionNos, String
}
if(!CollectionUtils.isEmpty(preparedStmtList))
builder.append("and status not IN ('CANCELLED')");

System.out.println("Final query ::" + builder.toString());

return builder.toString();
}

Expand All @@ -273,7 +272,6 @@ public String previousBillingCycleConnectionQuery(Set<String> connectionNos, Str
builder.append(" tenantId =? ");
preparedStmtList.add(tenantId);
}
System.out.println("Final conn query ::" + builder.toString());
return builder.toString();
}

Expand Down
Loading
Loading