Skip to content

Commit

Permalink
Merge pull request #115 from rootcodelabs/dev
Browse files Browse the repository at this point in the history
Pulling changes from ESCLASS-45-Dataset-Groups
  • Loading branch information
Thirunayan22 authored Jul 26, 2024
2 parents a756419 + 260a6ec commit 8a8d167
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 79 deletions.
2 changes: 1 addition & 1 deletion DSL/Resql/get-dataset-group-fields-by-id.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
SELECT validation_criteria
SELECT validation_criteria,num_pages
FROM dataset_group_metadata WHERE id =:id;
5 changes: 5 additions & 0 deletions DSL/Resql/update-dataset-group-validation-data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
UPDATE dataset_group_metadata
SET
validation_status = :validation_status::Validation_Status,
validation_errors = :validation_errors::jsonb
WHERE id = :id;
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ check_data_status:
assign_fields_response:
assign:
val: ${res_dataset.response.body[0].validationCriteria === null ? [] :JSON.parse(res_dataset.response.body[0].validationCriteria.value)}
num_pages: ${res_dataset.response.body[0].numPages}
next: assign_formated_response

assign_formated_response:
assign:
val: [{
dgId: '${group_id}',
fields: '${val === [] ? [] :val.fields}',
numPages: '${num_pages}',
dataPayload: '${res_data.response.body}'
}]
next: assign_success_response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ check_old_dataset_status:
execute_cron_manager:
call: reflect.mock
args:
url: "[#CLASSIFIER_CRON_MANAGER]/execute/<group_name>/<job_name>"
body:
url: "[#CLASSIFIER_CRON_MANAGER]/execute/dataset_processing/data_validation"
query:
cookie: ${incoming.header.cookie}
dgId: ${dg_id}
updateType: 'minor'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ check_old_dataset_status:
execute_cron_manager:
call: reflect.mock
args:
url: "[#CLASSIFIER_CRON_MANAGER]/execute/<group_name>/<job_name>"
body:
url: "[#CLASSIFIER_CRON_MANAGER]/execute/dataset_processing/data_validation"
query:
cookie: ${incoming.header.cookie}
dgId: ${dg_id}
updateType: 'patch'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
declaration:
call: declare
version: 0.1
description: "Description placeholder for 'STATUS'"
method: post
accepts: json
returns: json
namespace: classifier
allowlist:
body:
- field: dgId
type: string
description: "Body field 'dgId'"
- field: updateType
type: string
description: "Body field 'updateType'"
- field: patchPayload
type: json
description: "Body field 'patchPayload'"
- field: savedFilePath
type: string
description: "Body field 'savedFilePath'"
- field: validationStatus
type: string
description: "Body field 'validationStatus'"
- field: validationErrors
type: array
description: "Body field 'validationErrors'"

extract_request_data:
assign:
dg_id: ${incoming.body.dgId}
update_type: ${incoming.body.updateType}
patch_payload: ${incoming.body.patchPayload}
save_file_path: ${incoming.body.savedFilePath}
validation_status: ${incoming.body.validationStatus}
validation_errors: ${incoming.body.validationErrors}
next: update_dataset_group_validation

update_dataset_group_validation:
call: http.post
args:
url: "[#CLASSIFIER_RESQL]/update-dataset-group-validation-data"
body:
id: ${dg_id}
validation_status: ${validation_status}
validation_errors: ${JSON.stringify(validation_errors)}
result: res
next: check_status

check_status:
switch:
- condition: ${200 <= res.response.statusCodeValue && res.response.statusCodeValue < 300}
next: check_validation_status_type
next: assign_fail_response

check_validation_status_type:
switch:
- condition: ${validation_status === 'success'}
next: execute_cron_manager
next: assign_success_response

execute_cron_manager:
call: reflect.mock
args:
url: "[#CLASSIFIER_CRON_MANAGER]/execute/dataset_processing/dataset_processor"
query:
cookie: ${incoming.header.cookie}
dgId: ${dg_id}
updateType: ${update_type}
savedFilePath: ${save_file_path}
patchPayload: ${patchPayload}
result: res
next: assign_success_response

assign_success_response:
assign:
format_res: {
dgId: '${dg_id}',
operationSuccessful: true,
}
next: return_ok

assign_fail_response:
assign:
format_res: {
dgId: '${dg_id}',
operationSuccessful: false,
}
next: return_bad_request

return_ok:
status: 200
return: ${format_res}
next: end

return_bad_request:
status: 400
return: ${format_res}
next: end
156 changes: 82 additions & 74 deletions file-handler/file_handler_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Request, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
import os
import json
Expand All @@ -14,14 +15,21 @@

app = FastAPI()

app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3001", "http://localhost:3002"],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)

UPLOAD_DIRECTORY = os.getenv("UPLOAD_DIRECTORY", "/shared")
RUUTER_PRIVATE_URL = os.getenv("RUUTER_PRIVATE_URL")
S3_FERRY_URL = os.getenv("S3_FERRY_URL")
s3_ferry = S3Ferry(S3_FERRY_URL)

class ExportFile(BaseModel):
dgId: int
version: str
exportType: str

if not os.path.exists(UPLOAD_DIRECTORY):
Expand All @@ -46,104 +54,104 @@ async def authenticate_user(request: Request):

@app.post("/datasetgroup/data/import")
async def upload_and_copy(request: Request, dgId: int = Form(...), dataFile: UploadFile = File(...)):
await authenticate_user(request)

fileConverter = FileConverter()
file_type = fileConverter._detect_file_type(dataFile.filename)
fileName = f"{uuid.uuid4()}.{file_type}"
fileLocation = os.path.join(UPLOAD_DIRECTORY, fileName)

with open(fileLocation, "wb") as f:
f.write(dataFile.file.read())

success, convertedData = fileConverter.convert_to_json(fileLocation)
if not success:
upload_failed = UPLOAD_FAILED.copy()
upload_failed["reason"] = "Json file convert failed."
raise HTTPException(status_code=500, detail=upload_failed)

jsonLocalFilePath = fileLocation.replace(YAML_EXT, JSON_EXT).replace(YML_EXT, JSON_EXT).replace(XLSX_EXT, JSON_EXT)
with open(jsonLocalFilePath, 'w') as jsonFile:
json.dump(convertedData, jsonFile, indent=4)

saveLocation = f"/dataset/{dgId}/primary_dataset/dataset_{dgId}_aggregated{JSON_EXT}"
sourceFilePath = fileName.replace(YML_EXT, JSON_EXT).replace(XLSX_EXT, JSON_EXT)

response = s3_ferry.transfer_file(saveLocation, "S3", sourceFilePath, "FS")
if response.status_code == 201:
os.remove(fileLocation)
if fileLocation != jsonLocalFilePath:
os.remove(jsonLocalFilePath)
upload_success = UPLOAD_SUCCESS.copy()
upload_success["saved_file_path"] = saveLocation
return JSONResponse(status_code=200, content=upload_success)
else:
raise HTTPException(status_code=500, detail=S3_UPLOAD_FAILED)
try:
await authenticate_user(request)

print(f"Received dgId: {dgId}")
print(f"Received filename: {dataFile.filename}")

file_converter = FileConverter()
file_type = file_converter._detect_file_type(dataFile.filename)
file_name = f"{uuid.uuid4()}.{file_type}"
file_location = os.path.join(UPLOAD_DIRECTORY, file_name)

with open(file_location, "wb") as f:
f.write(dataFile.file.read())

success, converted_data = file_converter.convert_to_json(file_location)
if not success:
upload_failed = UPLOAD_FAILED.copy()
upload_failed["reason"] = "Json file convert failed."
raise HTTPException(status_code=500, detail=upload_failed)

json_local_file_path = file_location.replace(YAML_EXT, JSON_EXT).replace(YML_EXT, JSON_EXT).replace(XLSX_EXT, JSON_EXT)
with open(json_local_file_path, 'w') as json_file:
json.dump(converted_data, json_file, indent=4)

save_location = f"/dataset/{dgId}/primary_dataset/dataset_{dgId}_aggregated{JSON_EXT}"
source_file_path = file_name.replace(YML_EXT, JSON_EXT).replace(XLSX_EXT, JSON_EXT)

response = s3_ferry.transfer_file(save_location, "S3", source_file_path, "FS")
if response.status_code == 201:
os.remove(file_location)
if file_location != json_local_file_path:
os.remove(json_local_file_path)
upload_success = UPLOAD_SUCCESS.copy()
upload_success["saved_file_path"] = save_location
return JSONResponse(status_code=200, content=upload_success)
else:
raise HTTPException(status_code=500, detail=S3_UPLOAD_FAILED)
except Exception as e:
print(f"Exception in data/import : {e}")
raise HTTPException(status_code=500, detail=str(e))

@app.post("/datasetgroup/data/download")
async def download_and_convert(request: Request, exportData: ExportFile, background_tasks: BackgroundTasks):
async def download_and_convert(request: Request, exportData: ExportFile, backgroundTasks: BackgroundTasks):
await authenticate_user(request)
dgId = exportData.dgId
version = exportData.version
exportType = exportData.exportType
dg_id = exportData.dgId
export_type = exportData.exportType

if exportType not in ["xlsx", "yaml", "json"]:
if export_type not in ["xlsx", "yaml", "json"]:
raise HTTPException(status_code=500, detail=EXPORT_TYPE_ERROR)

if version == "minor":
saveLocation = f"/dataset/{dgId}/minor_update_temp/minor_update_{JSON_EXT}"
localFileName = f"group_{dgId}minor_update"
elif version == "major":
saveLocation = f"/dataset/{dgId}/primary_dataset/dataset_{dgId}_aggregated{JSON_EXT}"
localFileName = f"group_{dgId}_aggregated"
else:
raise HTTPException(status_code=500, detail=IMPORT_TYPE_ERROR)
save_location = f"/dataset/{dg_id}/primary_dataset/dataset_{dg_id}_aggregated{JSON_EXT}"
local_file_name = f"group_{dg_id}_aggregated"

response = s3_ferry.transfer_file(f"{localFileName}{JSON_EXT}", "FS", saveLocation, "S3")
response = s3_ferry.transfer_file(f"{local_file_name}{JSON_EXT}", "FS", save_location, "S3")
if response.status_code != 201:
raise HTTPException(status_code=500, detail=S3_DOWNLOAD_FAILED)

jsonFilePath = os.path.join('..', 'shared', f"{localFileName}{JSON_EXT}")
json_file_path = os.path.join('..', 'shared', f"{local_file_name}{JSON_EXT}")

fileConverter = FileConverter()
with open(f"{jsonFilePath}", 'r') as jsonFile:
jsonData = json.load(jsonFile)
file_converter = FileConverter()
with open(f"{json_file_path}", 'r') as json_file:
json_data = json.load(json_file)

if exportType == "xlsx":
outputFile = f"{localFileName}{XLSX_EXT}"
fileConverter.convert_json_to_xlsx(jsonData, outputFile)
elif exportType == "yaml":
outputFile = f"{localFileName}{YAML_EXT}"
fileConverter.convert_json_to_yaml(jsonData, outputFile)
elif exportType == "json":
outputFile = f"{jsonFilePath}"
if export_type == "xlsx":
output_file = f"{local_file_name}{XLSX_EXT}"
file_converter.convert_json_to_xlsx(json_data, output_file)
elif export_type == "yaml":
output_file = f"{local_file_name}{YAML_EXT}"
file_converter.convert_json_to_yaml(json_data, output_file)
elif export_type == "json":
output_file = f"{json_file_path}"
else:
raise HTTPException(status_code=500, detail=EXPORT_TYPE_ERROR)

background_tasks.add_task(os.remove, jsonFilePath)
if outputFile != jsonFilePath:
background_tasks.add_task(os.remove, outputFile)
backgroundTasks.add_task(os.remove, json_file_path)
if output_file != json_file_path:
backgroundTasks.add_task(os.remove, output_file)

return FileResponse(outputFile, filename=os.path.basename(outputFile))
return FileResponse(output_file, filename=os.path.basename(output_file))

@app.get("/datasetgroup/data/download/chunk")
async def download_and_convert(request: Request, dgId: int, pageId: int, background_tasks: BackgroundTasks):
async def download_and_convert(request: Request, dgId: int, pageId: int, backgroundTasks: BackgroundTasks):
await authenticate_user(request)
saveLocation = f"/dataset/{dgId}/chunks/{pageId}{JSON_EXT}"
localFileName = f"group_{dgId}_chunk_{pageId}"
save_location = f"/dataset/{dgId}/chunks/{pageId}{JSON_EXT}"
local_file_name = f"group_{dgId}_chunk_{pageId}"

response = s3_ferry.transfer_file(f"{localFileName}{JSON_EXT}", "FS", saveLocation, "S3")
response = s3_ferry.transfer_file(f"{local_file_name}{JSON_EXT}", "FS", save_location, "S3")
if response.status_code != 201:
raise HTTPException(status_code=500, detail=S3_DOWNLOAD_FAILED)

jsonFilePath = os.path.join('..', 'shared', f"{localFileName}{JSON_EXT}")
json_file_path = os.path.join('..', 'shared', f"{local_file_name}{JSON_EXT}")

with open(f"{jsonFilePath}", 'r') as jsonFile:
jsonData = json.load(jsonFile)
with open(f"{json_file_path}", 'r') as json_file:
json_data = json.load(json_file)

for index, item in enumerate(jsonData, start=1):
for index, item in enumerate(json_data, start=1):
item['rowID'] = index

background_tasks.add_task(os.remove, jsonFilePath)
backgroundTasks.add_task(os.remove, json_file_path)

return jsonData
return json_data

0 comments on commit 8a8d167

Please sign in to comment.