diff --git a/configuration/etl/etl.d/federated.json b/configuration/etl/etl.d/federated.json index 2798689..53c0db7 100644 --- a/configuration/etl/etl.d/federated.json +++ b/configuration/etl/etl.d/federated.json @@ -74,6 +74,12 @@ "options_class": "IngestorOptions", "enabled": true }, + "ingest-historical": { + "class": "DatabaseIngestor", + "namespace": "ETL\\Ingestor", + "options_class": "IngestorOptions", + "enabled": true + }, "ingest-cloud": { "endpoints": { "destination": { @@ -106,6 +112,9 @@ "ingest": { "$ref": "etl_pipelines.d/federated.json#/ingest" }, + "ingest-historical": { + "$ref": "etl_pipelines.d/federated-historical.json#/ingest" + }, "ingest-cloud": { "$ref": "etl_pipelines.d/federated-cloud.json#/ingest" } diff --git a/configuration/etl/etl_action_defs.d/federated/jobs/job-records-historical.json b/configuration/etl/etl_action_defs.d/federated/jobs/job-records-historical.json new file mode 100644 index 0000000..acc1aa1 --- /dev/null +++ b/configuration/etl/etl_action_defs.d/federated/jobs/job-records-historical.json @@ -0,0 +1,88 @@ +{ + "table_definition": { + "$ref": "${table_definition_dir}/jobs/xdw/job-records.json#/table_definition" + }, + "source_query": { + "overseer_restrictions": { + "last_modified_start_date": "jr.last_modified >= ${VALUE}", + "last_modified_end_date": "jr.last_modified <= ${VALUE}" + }, + "records": { + "job_record_origin_id": "jr.job_record_id", + "resource_id": "rf.id", + "resourcetype_id": "jr.resourcetype_id", + "request_id": "alf.request_id", + "account_id": "af.id", + "allocation_id": "alf.id", + "allocation_resource_id": "alf.resource_id", + "fos_id": "jr.fos_id", + "queue": "CONCAT (jr.queue, ' (', jrof.abbrev ,'-', rf.code, ')')", + "person_id": "pf.id", + "person_organization_id": "pf.organization_id", + "person_nsfstatuscode_id": "pf.nsfstatuscode_id", + "principalinvestigator_person_id": "pif.id", + "piperson_organization_id": "pif.organization_id", + "resource_state_id": "jr.resource_state_id", + "resource_country_id": "jr.resource_country_id", + "resource_organization_id": "jrof.id", + "resource_organization_type_id": "COALESCE(jrof.organizationtype_id,0)", + "job_record_type_id": "jr.job_record_type_id", + "submission_venue_id": "jr.submission_venue_id", + "submit_time_ts": "jr.submit_time_ts", + "start_time_ts": "jr.start_time_ts", + "end_time_ts": "jr.end_time_ts", + "start_day_id": "jr.start_day_id", + "end_day_id": "jr.end_day_id", + "completed": "jr.completed", + "federation_instance_id": "${instance_id}" + }, + "joins": [ + { + "name": "job_records", + "schema": "${SOURCE_SCHEMA}", + "alias": "jr" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "organization", + "alias": "jrof", + "on": "jr.resource_organization_id = jrof.organization_origin_id AND jrof.federation_instance_id = ${instance_id}" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "resourcefact", + "alias": "rf", + "on": "jr.resource_id = rf.resource_origin_id AND rf.organization_id = jrof.id" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "person", + "alias": "pf", + "on": "pf.person_origin_id = jr.person_id AND pf.organization_id = (SELECT id FROM ${DESTINATION_SCHEMA}.organization WHERE jr.person_organization_id = organization_origin_id AND federation_instance_id = ${instance_id})" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "person", + "alias": "pif", + "on": "pif.person_origin_id = jr.principalinvestigator_person_id AND pif.organization_id = (SELECT id FROM ${DESTINATION_SCHEMA}.organization WHERE jr.piperson_organization_id = organization_origin_id AND federation_instance_id = ${instance_id})" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "account", + "alias": "af", + "on": "af.account_origin_id = jr.account_id AND af.federation_instance_id = ${instance_id}" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "allocation", + "alias": "alf", + "on": "alf.allocation_origin_id = jr.allocation_id AND alf.resource_id = rf.id" + } + ], + "macros": [ + { + "$ref": "etl_macros.d/federated/federated.json#/getInstanceId" + } + ] + } +} diff --git a/configuration/etl/etl_action_defs.d/federated/jobs/job-records-spoke.json b/configuration/etl/etl_action_defs.d/federated/jobs/job-records-spoke.json new file mode 100644 index 0000000..2607a18 --- /dev/null +++ b/configuration/etl/etl_action_defs.d/federated/jobs/job-records-spoke.json @@ -0,0 +1,53 @@ +{ + "table_definition": { + "$ref": "${table_definition_dir}/jobs/xdw/job-records.json#/table_definition" + }, + "source_query": { + "records": { + "job_record_id": "jr.job_record_id", + "job_record_origin_id": "jr.job_record_origin_id", + "resource_id": "jr.resource_id", + "resourcetype_id": "jr.resourcetype_id", + "resource_state_id": "jr.resource_state_id", + "resource_country_id": "jr.resource_country_id", + "resource_organization_id": "jr.resource_organization_id", + "resource_organization_type_id": "jr.resource_organization_type_id", + "allocation_resource_id": "jr.allocation_resource_id", + "person_id": "jr.person_id", + "person_organization_id": "jr.person_organization_id", + "person_nsfstatuscode_id": "jr.person_nsfstatuscode_id", + "account_id": "jr.account_id", + "allocation_id": "jr.allocation_id", + "request_id": "jr.request_id", + "fos_id": "jr.fos_id", + "principalinvestigator_person_id": "jr.principalinvestigator_person_id", + "piperson_organization_id": "jr.piperson_organization_id", + "job_record_type_id": "jr.job_record_type_id", + "submission_venue_id": "jr.submission_venue_id", + "queue": "jr.queue", + "submit_time_ts": "jr.submit_time_ts", + "start_time_ts": "jr.start_time_ts", + "end_time_ts": "jr.end_time_ts", + "start_day_id": "jr.start_day_id", + "end_day_id": "jr.end_day_id", + "local_charge_su": "jr.local_charge_su", + "adjusted_charge_su": "jr.adjusted_charge_su", + "local_charge_xdsu": "jr.local_charge_xdsu", + "adjusted_charge_xdsu": "jr.adjusted_charge_xdsu", + "local_charge_nu": "jr.local_charge_nu", + "adjusted_charge_nu": "jr.adjusted_charge_nu", + "conversion_factor": "jr.conversion_factor", + "completed": "jr.completed", + "federation_instance_id": "jr.federation_instance_id", + "last_modified": "jr.last_modified", + "is_deleted": "jr.is_deleted" + }, + "joins": [ + { + "name": "job_records_staging", + "schema": "${SOURCE_SCHEMA}", + "alias": "jr" + } + ] + } +} diff --git a/configuration/etl/etl_action_defs.d/federated/jobs/job-tasks-historical.json b/configuration/etl/etl_action_defs.d/federated/jobs/job-tasks-historical.json new file mode 100644 index 0000000..66aafde --- /dev/null +++ b/configuration/etl/etl_action_defs.d/federated/jobs/job-tasks-historical.json @@ -0,0 +1,86 @@ +{ + "table_definition": { + "$ref": "${table_definition_dir}/jobs/xdw/job-tasks.json#/table_definition" + }, + "source_query": { + "overseer_restrictions": { + "last_modified_start_date": "jt.last_modified >= ${VALUE}", + "last_modified_end_date": "jt.last_modified <= ${VALUE}" + }, + "records": { + "job_record_id": "jr.job_record_id", + "job_id_origin_id": "jt.job_id", + "creation_time": "jt.creation_time", + "local_jobid": "jt.local_jobid", + "job_task_type_id": "jt.job_task_type_id", + "resource_id": "rf.id", + "local_job_array_index": "jt.local_job_array_index", + "local_job_id_raw": "jt.local_job_id_raw", + "name": "jt.name", + "node_count": "jt.node_count", + "processor_count": "jt.processor_count", + "gpu_count": "jt.gpu_count", + "systemaccount_id": "jt.systemaccount_id", + "person_id": "pf.id", + "person_organization_id": "pf.organization_id", + "person_nsfstatuscode_id": "pf.nsfstatuscode_id", + "wallduration": "jt.wallduration", + "waitduration": "jt.waitduration", + "cpu_time": "jt.cpu_time", + "gpu_time": "jt.gpu_time", + "submit_time_ts": "jt.submit_time_ts", + "start_time_ts": "jt.start_time_ts", + "end_time_ts": "jt.end_time_ts", + "eligible_time_ts": "jt.eligible_time_ts", + "start_day_id": "jt.start_day_id", + "end_day_id": "jt.end_day_id", + "eligible_day_id": "jt.eligible_day_id", + "group_name": "jt.group_name", + "gid_number": "jt.gid_number", + "uid_number": "jt.uid_number", + "exit_code": "jt.exit_code", + "exit_state": "jt.exit_state", + "cpu_req": "jt.cpu_req", + "mem_req": "jt.mem_req", + "timelimit": "jt.timelimit", + "memory_kb": "jt.memory_kb", + "completed": "jt.completed" + }, + "joins": [ + { + "name": "job_tasks", + "schema": "${SOURCE_SCHEMA}", + "alias": "jt" + }, + { + "name": "job_records", + "schema": "${DESTINATION_SCHEMA}", + "alias": "jr", + "on": "jr.job_record_origin_id = jt.job_record_id AND jr.federation_instance_id = ${instance_id}" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "organization", + "alias": "destorg", + "on": "destorg.federation_instance_id = ${instance_id}" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "resourcefact", + "alias": "rf", + "on": "jt.resource_id = rf.resource_origin_id AND rf.`organization_id` = destorg.id AND destorg.`federation_instance_id` = ${instance_id} " + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "person", + "alias": "pf", + "on": "pf.person_origin_id = jt.person_id AND pf.organization_id = (SELECT id FROM ${DESTINATION_SCHEMA}.organization WHERE jt.person_organization_id = organization_origin_id AND federation_instance_id = ${instance_id})" + } + ], + "macros": [ + { + "$ref": "etl_macros.d/federated/federated.json#/getInstanceId" + } + ] + } +} diff --git a/configuration/etl/etl_action_defs.d/federated/jobs/job-tasks-spoke.json b/configuration/etl/etl_action_defs.d/federated/jobs/job-tasks-spoke.json new file mode 100644 index 0000000..3971da0 --- /dev/null +++ b/configuration/etl/etl_action_defs.d/federated/jobs/job-tasks-spoke.json @@ -0,0 +1,63 @@ +{ + "table_definition": { + "$ref": "${table_definition_dir}/jobs/xdw/job-tasks.json#/table_definition" + }, + "source_query": { + "records": { + "job_record_id": "jt.job_record_id", + "job_id": "jt.job_id", + "job_id_origin_id": "jt.job_id_origin_id", + "creation_time": "jt.creation_time", + "local_jobid": "jt.local_jobid", + "local_job_array_index": "jt.local_job_array_index", + "local_job_id_raw": "jt.local_job_id_raw", + "resource_id": "jt.resource_id", + "systemaccount_id": "jt.systemaccount_id", + "person_id": "jt.person_id", + "person_organization_id": "jt.person_organization_id", + "person_nsfstatuscode_id": "jt.person_nsfstatuscode_id", + "job_task_type_id": "jt.job_task_type_id", + "name": "jt.name", + "submit_time_ts": "jt.submit_time_ts", + "start_time_ts": "jt.start_time_ts", + "end_time_ts": "jt.end_time_ts", + "eligible_time_ts": "jt.eligible_time_ts", + "start_day_id": "jt.start_day_id", + "end_day_id": "jt.end_day_id", + "eligible_day_id": "jt.eligible_day_id", + "node_count": "jt.node_count", + "processor_count": "jt.processor_count", + "gpu_count": "jt.gpu_count", + "memory_kb": "jt.memory_kb", + "wallduration": "jt.wallduration", + "waitduration": "jt.waitduration", + "cpu_time": "jt.cpu_time", + "gpu_time": "jt.gpu_time", + "local_charge_su": "jt.local_charge_su", + "adjusted_charge_su": "jt.adjusted_charge_su", + "local_charge_xdsu": "jt.local_charge_xdsu", + "adjusted_charge_xdsu": "jt.adjusted_charge_xdsu", + "local_charge_nu": "jt.local_charge_nu", + "adjusted_charge_nu": "jt.adjusted_charge_nu", + "group_name": "jt.group_name", + "gid_number": "jt.gid_number", + "uid_number": "jt.uid_number", + "exit_code": "jt.exit_code", + "exit_state": "jt.exit_state", + "cpu_req": "jt.cpu_req", + "mem_req": "jt.mem_req", + "timelimit": "jt.timelimit", + "coversion_factor": "jt.conversion_factor", + "completed": "jt.completed", + "last_modified": "jt.last_modified", + "is_deleted": "jt.is_deleted" + }, + "joins": [ + { + "name": "job_tasks_staging", + "schema": "${SOURCE_SCHEMA}", + "alias": "jt" + } + ] + } +} diff --git a/configuration/etl/etl_action_defs.d/federated/jobs/jobhosts-historical.json b/configuration/etl/etl_action_defs.d/federated/jobs/jobhosts-historical.json new file mode 100644 index 0000000..d14d68d --- /dev/null +++ b/configuration/etl/etl_action_defs.d/federated/jobs/jobhosts-historical.json @@ -0,0 +1,70 @@ +{ + "table_definition": { + "$ref": "${table_definition_dir}/jobs/xdw/jobhosts.json#/table_definition" + }, + "source_query": { + "overseer_restrictions": { + "last_modified_start_date": "jt.last_modified >= ${VALUE}", + "last_modified_end_date": "jt.last_modified <= ${VALUE}" + }, + "records": { + "job_id": "djt.job_id", + "host_id": "dh.id", + "order_id": "sjh.order_id" + }, + "joins": [ + { + "schema": "${SOURCE_SCHEMA}", + "name": "job_tasks", + "alias": "jt" + }, + { + "schema": "${SOURCE_SCHEMA}", + "name": "jobhosts", + "alias": "sjh", + "on": "jt.job_id = sjh.job_id" + }, + { + "schema": "${SOURCE_SCHEMA}", + "name": "hosts", + "alias": "sh", + "on": "sjh.host_id = sh.id" + }, + { + "schema": "${SOURCE_SCHEMA}", + "name": "resourcefact", + "alias": "sr", + "on": "sh.resource_id = sr.id" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "organization", + "alias": "do", + "on": "do.organization_origin_id = sr.organization_id AND federation_instance_id = ${instance_id}" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "resourcefact", + "alias": "drf", + "on": "drf.organization_id = do.id and drf.resource_origin_id = sh.resource_id" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "job_tasks", + "alias": "djt", + "on": "djt.job_id_origin_id = sjh.job_id AND djt.resource_id = drf.id" + }, + { + "schema": "${DESTINATION_SCHEMA}", + "name": "hosts", + "alias": "dh", + "on": "dh.resource_id = drf.id AND sh.id = dh.host_origin_id" + } + ], + "macros": [ + { + "$ref": "etl_macros.d/federated/federated.json#/getInstanceId" + } + ] + } +} diff --git a/configuration/etl/etl_pipelines.d/federated-historical.json b/configuration/etl/etl_pipelines.d/federated-historical.json new file mode 100644 index 0000000..629f176 --- /dev/null +++ b/configuration/etl/etl_pipelines.d/federated-historical.json @@ -0,0 +1,116 @@ +{ + "description": "Federation Jobs Ingest", + "ingest": [ + { + "name": "instance-ogranization", + "definition_file": "federated/jobs/organization.json", + "description": "Organization *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-resource", + "definition_file": "federated/jobs/resource-fact.json", + "description": "Resource *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-queue", + "definition_file": "federated/jobs/queue.json", + "description": "Queue *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-resource-spec", + "definition_file": "federated/jobs/resource-specs.json", + "description": "Resource Spec *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-resource-allocated", + "definition_file": "federated/jobs/resource-allocated.json", + "description": "Resource Allocation *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-hosts", + "definition_file": "federated/jobs/hosts.json", + "description": "Hosts *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-person", + "definition_file": "federated/jobs/person.json", + "description": "Person *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-piperson", + "definition_file": "federated/jobs/piperson.json", + "description": "PI Person *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-account", + "definition_file": "federated/jobs/account.json", + "description": "Account *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-request", + "definition_file": "federated/jobs/request.json", + "description": "Request *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-allocation", + "definition_file": "federated/jobs/allocation.json", + "description": "Allocation *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-allocation-on-resource", + "definition_file": "federated/jobs/allocation-on-resource.json", + "description": "Allocation on Resource *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-allocation-breakdown", + "definition_file": "federated/jobs/allocation-breakdown.json", + "description": "Allocation Breakdown *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-system-account", + "definition_file": "federated/jobs/system-account.json", + "description": "System Account *requires: -d instance_name*", + "truncate_destination": false + }, + { + "name": "instance-job-record", + "definition_file": "federated/jobs/job-records-historical.json", + "description": "Job records *requires: -d instance_name*", + "analyze_table": false, + "truncate_destination": false + }, + { + "name": "instance-job-task", + "definition_file": "federated/jobs/job-tasks-historical.json", + "description": "Job tasks *requires: -d instance_name*", + "analyze_table": false, + "truncate_destination": false + }, + { + "name": "instance-jobhosts", + "definition_file": "federated/jobs/jobhosts-historical.json", + "description": "Job Hosts *requires: -d instance_name*", + "analyze_table": false, + "truncate_destination": false + }, + { + "name": "instance-pi", + "definition_file": "federated/jobs/principal-investigator.json", + "description": "pi *requires: -d instance_name*", + "truncate_destination": false + } + ] +} diff --git a/configuration/etl/etl_pipelines.d/federated.json b/configuration/etl/etl_pipelines.d/federated.json index 1ec1f6a..064e089 100644 --- a/configuration/etl/etl_pipelines.d/federated.json +++ b/configuration/etl/etl_pipelines.d/federated.json @@ -111,6 +111,36 @@ "definition_file": "federated/jobs/principal-investigator.json", "description": "pi *requires: -d instance_name*", "truncate_destination": false + }, + { + "name": "instance-job-task-spoke", + "definition_file": "federated/jobs/job-tasks-spoke.json", + "description": "Job tasks *requires: -d instance_name*", + "endpoints": { + "destination": { + "type": "mysql", + "name": "cloud instance destination", + "config": "datawarehouse", + "schema": "${instance_name}-modw", + "create_schema_if_not_exists": true + } + }, + "truncate_destination": false + }, + { + "name": "instance-job-records-spoke", + "definition_file": "federated/jobs/job-records-spoke.json", + "description": "Job tasks *requires: -d instance_name*", + "endpoints": { + "destination": { + "type": "mysql", + "name": "cloud instance destination", + "config": "datawarehouse", + "schema": "${instance_name}-modw", + "create_schema_if_not_exists": true + } + }, + "truncate_destination": false } ] }