diff --git a/DSL/CronManager/DSL/dataset_processing.yml b/DSL/CronManager/DSL/dataset_processing.yml index 23fe2743..c8a3b03b 100644 --- a/DSL/CronManager/DSL/dataset_processing.yml +++ b/DSL/CronManager/DSL/dataset_processing.yml @@ -2,10 +2,10 @@ dataset_processor: trigger: off type: exec command: "../app/scripts/data_processor_exec.sh" - allowedEnvs: ["cookie","dgId","updateType","savedFilePath","patchPayload"] + allowedEnvs: ["cookie","dgId", "newDgId","updateType","savedFilePath","patchPayload"] data_validation: trigger: off type: exec command: "../app/scripts/data_validator_exec.sh" - allowedEnvs: ["cookie","dgId","updateType","savedFilePath","patchPayload"] \ No newline at end of file + allowedEnvs: ["cookie","dgId", "newDgId","updateType","savedFilePath","patchPayload"] \ No newline at end of file diff --git a/DSL/CronManager/script/data_processor_exec.sh b/DSL/CronManager/script/data_processor_exec.sh index 35f2c40b..34d0cc43 100644 --- a/DSL/CronManager/script/data_processor_exec.sh +++ b/DSL/CronManager/script/data_processor_exec.sh @@ -1,20 +1,21 @@ #!/bin/bash # Ensure required environment variables are set -if [ -z "$dgId" ] || [ -z "$cookie" ] || [ -z "$updateType" ] || [ -z "$savedFilePath" ] || [ -z "$patchPayload" ]; then +if [ -z "$dgId" ] || [ -z "$newDgId" ] || [ -z "$cookie" ] || [ -z "$updateType" ] || [ -z "$savedFilePath" ] || [ -z "$patchPayload" ]; then echo "One or more environment variables are missing." - echo "Please set dgId, cookie, updateType, savedFilePath, and patchPayload." + echo "Please set dgId, newDgId, cookie, updateType, savedFilePath, and patchPayload." exit 1 fi -# Construct the payload using grep +# Construct the payload using here document payload=$(cat <0: @@ -300,14 +332,14 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) if updateType == "minor_initial_update": print("Handling Minor update") - # dataset = self.get_dataset(dgID, cookie) + # dataset = self.get_dataset(dgId, cookie) dataset = self.get_dataset_by_location(savedFilePath, cookie) if dataset is not None: print("Dataset retrieved successfully") structured_data = self.check_and_convert(dataset) if structured_data is not None: print("Dataset converted successfully") - selected_data_fields_to_enrich = self.get_selected_data_fields(dgID, cookie) + selected_data_fields_to_enrich = self.get_selected_data_fields(newDgId, cookie) if selected_data_fields_to_enrich is not None: print("Selected data fields to enrich retrieved successfully") max_row_id = max(item["rowId"] for item in structured_data) @@ -317,7 +349,7 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) if enriched_data is not None: print("Data enrichment successful") - stop_words = self.get_stopwords(dgID, cookie) + stop_words = self.get_stopwords(newDgId, cookie) if stop_words is not None: print("Stop words retrieved successfully") print(agregated_dataset) @@ -329,12 +361,12 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) if chunked_data is not None: print("Data chunking successful") print(chunked_data) - operation_result = self.save_chunked_data(chunked_data, cookie, dgID, 0) + operation_result = self.save_chunked_data(chunked_data, cookie, newDgId, 0) if operation_result: print("Chunked data saved successfully") - agregated_dataset_operation = self.save_aggregrated_data(dgID, cookie, cleaned_data) + agregated_dataset_operation = self.save_aggregrated_data(newDgId, cookie, cleaned_data) if agregated_dataset_operation != None: - return_data = self.update_preprocess_status(dgID, cookie, True, False, f"/dataset/{dgID}/chunks/", "", True, len(cleaned_data), len(chunked_data)) + return_data = self.update_preprocess_status(newDgId, cookie, True, False, f"/dataset/{newDgId}/chunks/", "", True, len(cleaned_data), len(chunked_data)) print(return_data) return SUCCESSFUL_OPERATION else: @@ -366,7 +398,7 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) return FAILED_TO_GET_DATASET elif updateType == "minor_append_update": print("Handling Minor update") - agregated_dataset = self.get_dataset(dgID, cookie) + agregated_dataset = self.get_dataset(dgId, cookie) max_row_id = max(item["rowId"] for item in agregated_dataset) if agregated_dataset is not None: print("Aggregated dataset retrieved successfully") @@ -378,14 +410,14 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) print(structured_data[-1]) if structured_data is not None: print("Minor update dataset converted successfully") - selected_data_fields_to_enrich = self.get_selected_data_fields(dgID, cookie) + selected_data_fields_to_enrich = self.get_selected_data_fields(newDgId, cookie) if selected_data_fields_to_enrich is not None: print("Selected data fields to enrich for minor update retrieved successfully") max_row_id = max(item["rowId"] for item in structured_data) enriched_data = self.enrich_data(structured_data, selected_data_fields_to_enrich, max_row_id) if enriched_data is not None: print("Minor update data enrichment successful") - stop_words = self.get_stopwords(dgID, cookie) + stop_words = self.get_stopwords(newDgId, cookie) if stop_words is not None: combined_new_dataset = structured_data + enriched_data print("Stop words for minor update retrieved successfully") @@ -395,18 +427,19 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) chunked_data = self.chunk_data(cleaned_data) if chunked_data is not None: print("Minor update data chunking successful") - page_count = self.get_page_count(dgID, cookie) + page_count = self.get_page_count(dgId, cookie) if page_count is not None: print(f"Page count retrieved successfully: {page_count}") print(chunked_data) - operation_result = self.save_chunked_data(chunked_data, cookie, dgID, page_count) + copy_exsisting_files = self.copy_chunked_datafiles(dgId, newDgId, cookie, page_count) + operation_result = self.save_chunked_data(chunked_data, cookie, newDgId, page_count) if operation_result is not None: print("Chunked data for minor update saved successfully") agregated_dataset += cleaned_data - agregated_dataset_operation = self.save_aggregrated_data(dgID, cookie, agregated_dataset) + agregated_dataset_operation = self.save_aggregrated_data(newDgId, cookie, agregated_dataset) if agregated_dataset_operation: print("Aggregated dataset for minor update saved successfully") - return_data = self.update_preprocess_status(dgID, cookie, True, False, f"/dataset/{dgID}/chunks/", "", True, len(cleaned_data), (len(chunked_data)+page_count)) + return_data = self.update_preprocess_status(newDgId, cookie, True, False, f"/dataset/{newDgId}/chunks/", "", True, len(agregated_dataset), (len(chunked_data)+page_count)) print(return_data) return SUCCESSFUL_OPERATION else: @@ -449,13 +482,13 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) print("*************") if (data_payload["editedData"]!=[]): print("Handling Patch update") - stop_words = self.get_stopwords(dgID, cookie) + stop_words = self.get_stopwords(dgId, cookie) if stop_words is not None: print("Stop words for patch update retrieved successfully") cleaned_patch_payload = self.remove_stop_words(data_payload["editedData"], stop_words) if cleaned_patch_payload is not None: print("Stop words for patch update removed successfully") - page_count = self.get_page_count(dgID, cookie) + page_count = self.get_page_count(dgId, cookie) if page_count is not None: print(f"Page count for patch update retrieved successfully: {page_count}") print(cleaned_patch_payload) @@ -468,7 +501,7 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) chunk_updates[chunkNum].append(entry) print(f"Chunk updates prepared: {chunk_updates}") for chunkNum, entries in chunk_updates.items(): - chunk_data = self.download_chunk(dgID, cookie, chunkNum) + chunk_data = self.download_chunk(dgId, cookie, chunkNum) if chunk_data is not None: print(f"Chunk {chunkNum} downloaded successfully") for entry in entries: @@ -477,14 +510,14 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) if chunk_entry.get("rowId") == rowId: chunk_data[idx] = entry break - chunk_save_operation = self.save_chunked_data([chunk_data], cookie, dgID, chunkNum-1) + chunk_save_operation = self.save_chunked_data([chunk_data], cookie, dgId, chunkNum-1) if chunk_save_operation == None: print(f"Failed to save chunk {chunkNum}") return FAILED_TO_SAVE_CHUNKED_DATA else: print(f"Failed to download chunk {chunkNum}") return FAILED_TO_DOWNLOAD_CHUNK - agregated_dataset = self.get_dataset(dgID, cookie) + agregated_dataset = self.get_dataset(dgId, cookie) if agregated_dataset is not None: print("Aggregated dataset for patch update retrieved successfully") for entry in cleaned_patch_payload: @@ -496,7 +529,7 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) agregated_dataset[index] = entry break - save_result_update = self.save_aggregrated_data(dgID, cookie, agregated_dataset) + save_result_update = self.save_aggregrated_data(dgId, cookie, agregated_dataset) if save_result_update: print("Aggregated dataset for patch update saved successfully") # return SUCCESSFUL_OPERATION @@ -521,7 +554,7 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) try: print("Handling deleted data rows") deleted_rows = data_payload["deletedDataRows"] - aggregated_dataset = self.get_dataset(dgID, cookie) + aggregated_dataset = self.get_dataset(dgId, cookie) if aggregated_dataset is not None: print("Aggregated dataset for delete operation retrieved successfully") updated_dataset = [row for row in aggregated_dataset if row.get('rowId') not in deleted_rows] @@ -533,10 +566,10 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) if chunked_data is not None: print("Data chunking after delete operation successful") print(chunked_data) - operation_result = self.save_chunked_data(chunked_data, cookie, dgID, 0) + operation_result = self.save_chunked_data(chunked_data, cookie, dgId, 0) if operation_result: print("Chunked data after delete operation saved successfully") - save_result_delete = self.save_aggregrated_data(dgID, cookie, updated_dataset) + save_result_delete = self.save_aggregrated_data(dgId, cookie, updated_dataset) if save_result_delete: print("Aggregated dataset after delete operation saved successfully") else: @@ -567,14 +600,14 @@ def process_handler(self, dgID, cookie, updateType, savedFilePath, patchPayload) return FAILED_TO_SAVE_AGGREGATED_DATA elif data_payload["editedData"]==[] and data_payload["deletedDataRows"]!=[]: if save_result_delete: - return_data = self.update_preprocess_status(dgID, cookie, True, False, f"/dataset/{dgID}/chunks/", "", True, len(updated_dataset), len(chunked_data)) + return_data = self.update_preprocess_status(dgId, cookie, True, False, f"/dataset/{dgId}/chunks/", "", True, len(updated_dataset), len(chunked_data)) print(return_data) return SUCCESSFUL_OPERATION else: return FAILED_TO_SAVE_AGGREGATED_DATA elif data_payload["editedData"]!=[] and data_payload["deletedDataRows"]!=[]: if save_result_update and save_result_delete: - return_data = self.update_preprocess_status(dgID, cookie, True, False, f"/dataset/{dgID}/chunks/", "", True, len(updated_dataset), len(chunked_data)) + return_data = self.update_preprocess_status(dgId, cookie, True, False, f"/dataset/{dgId}/chunks/", "", True, len(updated_dataset), len(chunked_data)) print(return_data) return SUCCESSFUL_OPERATION else: diff --git a/dataset-processor/dataset_processor_api.py b/dataset-processor/dataset_processor_api.py index 4edb2094..d2d831f4 100644 --- a/dataset-processor/dataset_processor_api.py +++ b/dataset-processor/dataset_processor_api.py @@ -19,7 +19,8 @@ ) class ProcessHandlerRequest(BaseModel): - dgID: int + dgId: int + newDgId: int cookie: str updateType: str savedFilePath: str @@ -47,7 +48,7 @@ async def process_handler_endpoint(request: Request): await authenticate_user(request) authCookie = payload["cookie"] - result = processor.process_handler(int(payload["dgID"]), authCookie, payload["updateType"], payload["savedFilePath"], payload["patchPayload"]) + result = processor.process_handler(int(payload["dgId"]), int(payload["newDgId"]), authCookie, payload["updateType"], payload["savedFilePath"], payload["patchPayload"]) if result: return result else: @@ -69,6 +70,7 @@ async def forward_request(request: Request, response: Response): print(payload) payload2 = {} payload2["dgId"] = int(payload["dgId"]) + payload2["newDgId"] = int(payload["newDgId"]) payload2["updateType"] = payload["updateType"] payload2["patchPayload"] = payload["patchPayload"] payload2["savedFilePath"] = payload["savedFilePath"] diff --git a/docker-compose.yml b/docker-compose.yml index 69b10c7b..130113de 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -226,7 +226,8 @@ services: - FILE_HANDLER_DOWNLOAD_LOCATION_JSON_URL=http://file-handler:8000/datasetgroup/data/download/json/location - FILE_HANDLER_STOPWORDS_URL=http://file-handler:8000/datasetgroup/data/download/json/stopwords - FILE_HANDLER_IMPORT_CHUNKS_URL=http://file-handler:8000/datasetgroup/data/import/chunk - - GET_PAGE_COUNT_URL=http://ruuter-private:8088/classifier/datasetgroup/group/data?groupId=dgID&pageNum=1 + - FILE_HANDLER_COPY_CHUNKS_URL=http://file-handler:8000/datasetgroup/data/copy + - GET_PAGE_COUNT_URL=http://ruuter-private:8088/classifier/datasetgroup/group/data?groupId=dgId&pageNum=1 - SAVE_JSON_AGGREGRATED_DATA_URL=http://file-handler:8000/datasetgroup/data/import/json - DOWNLOAD_CHUNK_URL=http://file-handler:8000/datasetgroup/data/download/chunk - STATUS_UPDATE_URL=http://ruuter-private:8088/classifier/datasetgroup/update/preprocess/status diff --git a/file-handler/file_handler_api.py b/file-handler/file_handler_api.py index b443c418..418a37cc 100644 --- a/file-handler/file_handler_api.py +++ b/file-handler/file_handler_api.py @@ -42,6 +42,11 @@ class ImportJsonMajor(BaseModel): dgId: int dataset: list +class CopyPayload(BaseModel): + dgId: int + newDgId: int + fileLocations: list + if not os.path.exists(UPLOAD_DIRECTORY): os.makedirs(UPLOAD_DIRECTORY) @@ -282,3 +287,36 @@ async def upload_and_copy(request: Request, importData: ImportJsonMajor): return JSONResponse(status_code=200, content=upload_success) else: raise HTTPException(status_code=500, detail=S3_UPLOAD_FAILED) + +@app.post("/datasetgroup/data/copy") +async def upload_and_copy(request: Request, copyPayload: CopyPayload): + cookie = request.cookies.get("customJwtCookie") + await authenticate_user(f'customJwtCookie={cookie}') + + dg_id = copyPayload.dgId + new_dg_id = copyPayload.newDgId + files = copyPayload.fileLocations + + if len(files)>0: + local_storage_location = "temp_copy.json" + else: + print("Abort copying since sent file list does not have any entry.") + upload_success = UPLOAD_SUCCESS.copy() + upload_success["saved_file_path"] = "" + return JSONResponse(status_code=200, content=upload_success) + for file in files: + old_location = f"/dataset/{dg_id}/{file}" + new_location = f"/dataset/{new_dg_id}/{file}" + response = s3_ferry.transfer_file(local_storage_location, "FS", old_location, "S3") + response = s3_ferry.transfer_file(new_location, "S3", local_storage_location, "FS") + + if response.status_code == 201: + print(f"Copying completed : {file}") + else: + print(f"Copying failed : {file}") + raise HTTPException(status_code=500, detail=S3_UPLOAD_FAILED) + else: + os.remove(local_storage_location) + upload_success = UPLOAD_SUCCESS.copy() + upload_success["saved_file_path"] = f"/dataset/{new_dg_id}/" + return JSONResponse(status_code=200, content=upload_success)