Skip to content

Commit

Permalink
Merge pull request #21 from eiffel777/add-historical-ingestion-action…
Browse files Browse the repository at this point in the history
…s-pipeline

Adding pipelines and actions for historical ingestion of data
  • Loading branch information
eiffel777 authored Jul 6, 2021
2 parents dd9af93 + d45547e commit 151a519
Show file tree
Hide file tree
Showing 8 changed files with 515 additions and 0 deletions.
9 changes: 9 additions & 0 deletions configuration/etl/etl.d/federated.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
"options_class": "IngestorOptions",
"enabled": true
},
"ingest-historical": {
"class": "DatabaseIngestor",
"namespace": "ETL\\Ingestor",
"options_class": "IngestorOptions",
"enabled": true
},
"ingest-cloud": {
"endpoints": {
"destination": {
Expand Down Expand Up @@ -106,6 +112,9 @@
"ingest": {
"$ref": "etl_pipelines.d/federated.json#/ingest"
},
"ingest-historical": {
"$ref": "etl_pipelines.d/federated-historical.json#/ingest"
},
"ingest-cloud": {
"$ref": "etl_pipelines.d/federated-cloud.json#/ingest"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
{
"table_definition": {
"$ref": "${table_definition_dir}/jobs/xdw/job-records.json#/table_definition"
},
"source_query": {
"overseer_restrictions": {
"last_modified_start_date": "jr.last_modified >= ${VALUE}",
"last_modified_end_date": "jr.last_modified <= ${VALUE}"
},
"records": {
"job_record_origin_id": "jr.job_record_id",
"resource_id": "rf.id",
"resourcetype_id": "jr.resourcetype_id",
"request_id": "alf.request_id",
"account_id": "af.id",
"allocation_id": "alf.id",
"allocation_resource_id": "alf.resource_id",
"fos_id": "jr.fos_id",
"queue": "CONCAT (jr.queue, ' (', jrof.abbrev ,'-', rf.code, ')')",
"person_id": "pf.id",
"person_organization_id": "pf.organization_id",
"person_nsfstatuscode_id": "pf.nsfstatuscode_id",
"principalinvestigator_person_id": "pif.id",
"piperson_organization_id": "pif.organization_id",
"resource_state_id": "jr.resource_state_id",
"resource_country_id": "jr.resource_country_id",
"resource_organization_id": "jrof.id",
"resource_organization_type_id": "COALESCE(jrof.organizationtype_id,0)",
"job_record_type_id": "jr.job_record_type_id",
"submission_venue_id": "jr.submission_venue_id",
"submit_time_ts": "jr.submit_time_ts",
"start_time_ts": "jr.start_time_ts",
"end_time_ts": "jr.end_time_ts",
"start_day_id": "jr.start_day_id",
"end_day_id": "jr.end_day_id",
"completed": "jr.completed",
"federation_instance_id": "${instance_id}"
},
"joins": [
{
"name": "job_records",
"schema": "${SOURCE_SCHEMA}",
"alias": "jr"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "organization",
"alias": "jrof",
"on": "jr.resource_organization_id = jrof.organization_origin_id AND jrof.federation_instance_id = ${instance_id}"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "resourcefact",
"alias": "rf",
"on": "jr.resource_id = rf.resource_origin_id AND rf.organization_id = jrof.id"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "person",
"alias": "pf",
"on": "pf.person_origin_id = jr.person_id AND pf.organization_id = (SELECT id FROM ${DESTINATION_SCHEMA}.organization WHERE jr.person_organization_id = organization_origin_id AND federation_instance_id = ${instance_id})"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "person",
"alias": "pif",
"on": "pif.person_origin_id = jr.principalinvestigator_person_id AND pif.organization_id = (SELECT id FROM ${DESTINATION_SCHEMA}.organization WHERE jr.piperson_organization_id = organization_origin_id AND federation_instance_id = ${instance_id})"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "account",
"alias": "af",
"on": "af.account_origin_id = jr.account_id AND af.federation_instance_id = ${instance_id}"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "allocation",
"alias": "alf",
"on": "alf.allocation_origin_id = jr.allocation_id AND alf.resource_id = rf.id"
}
],
"macros": [
{
"$ref": "etl_macros.d/federated/federated.json#/getInstanceId"
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"table_definition": {
"$ref": "${table_definition_dir}/jobs/xdw/job-records.json#/table_definition"
},
"source_query": {
"records": {
"job_record_id": "jr.job_record_id",
"job_record_origin_id": "jr.job_record_origin_id",
"resource_id": "jr.resource_id",
"resourcetype_id": "jr.resourcetype_id",
"resource_state_id": "jr.resource_state_id",
"resource_country_id": "jr.resource_country_id",
"resource_organization_id": "jr.resource_organization_id",
"resource_organization_type_id": "jr.resource_organization_type_id",
"allocation_resource_id": "jr.allocation_resource_id",
"person_id": "jr.person_id",
"person_organization_id": "jr.person_organization_id",
"person_nsfstatuscode_id": "jr.person_nsfstatuscode_id",
"account_id": "jr.account_id",
"allocation_id": "jr.allocation_id",
"request_id": "jr.request_id",
"fos_id": "jr.fos_id",
"principalinvestigator_person_id": "jr.principalinvestigator_person_id",
"piperson_organization_id": "jr.piperson_organization_id",
"job_record_type_id": "jr.job_record_type_id",
"submission_venue_id": "jr.submission_venue_id",
"queue": "jr.queue",
"submit_time_ts": "jr.submit_time_ts",
"start_time_ts": "jr.start_time_ts",
"end_time_ts": "jr.end_time_ts",
"start_day_id": "jr.start_day_id",
"end_day_id": "jr.end_day_id",
"local_charge_su": "jr.local_charge_su",
"adjusted_charge_su": "jr.adjusted_charge_su",
"local_charge_xdsu": "jr.local_charge_xdsu",
"adjusted_charge_xdsu": "jr.adjusted_charge_xdsu",
"local_charge_nu": "jr.local_charge_nu",
"adjusted_charge_nu": "jr.adjusted_charge_nu",
"conversion_factor": "jr.conversion_factor",
"completed": "jr.completed",
"federation_instance_id": "jr.federation_instance_id",
"last_modified": "jr.last_modified",
"is_deleted": "jr.is_deleted"
},
"joins": [
{
"name": "job_records_staging",
"schema": "${SOURCE_SCHEMA}",
"alias": "jr"
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{
"table_definition": {
"$ref": "${table_definition_dir}/jobs/xdw/job-tasks.json#/table_definition"
},
"source_query": {
"overseer_restrictions": {
"last_modified_start_date": "jt.last_modified >= ${VALUE}",
"last_modified_end_date": "jt.last_modified <= ${VALUE}"
},
"records": {
"job_record_id": "jr.job_record_id",
"job_id_origin_id": "jt.job_id",
"creation_time": "jt.creation_time",
"local_jobid": "jt.local_jobid",
"job_task_type_id": "jt.job_task_type_id",
"resource_id": "rf.id",
"local_job_array_index": "jt.local_job_array_index",
"local_job_id_raw": "jt.local_job_id_raw",
"name": "jt.name",
"node_count": "jt.node_count",
"processor_count": "jt.processor_count",
"gpu_count": "jt.gpu_count",
"systemaccount_id": "jt.systemaccount_id",
"person_id": "pf.id",
"person_organization_id": "pf.organization_id",
"person_nsfstatuscode_id": "pf.nsfstatuscode_id",
"wallduration": "jt.wallduration",
"waitduration": "jt.waitduration",
"cpu_time": "jt.cpu_time",
"gpu_time": "jt.gpu_time",
"submit_time_ts": "jt.submit_time_ts",
"start_time_ts": "jt.start_time_ts",
"end_time_ts": "jt.end_time_ts",
"eligible_time_ts": "jt.eligible_time_ts",
"start_day_id": "jt.start_day_id",
"end_day_id": "jt.end_day_id",
"eligible_day_id": "jt.eligible_day_id",
"group_name": "jt.group_name",
"gid_number": "jt.gid_number",
"uid_number": "jt.uid_number",
"exit_code": "jt.exit_code",
"exit_state": "jt.exit_state",
"cpu_req": "jt.cpu_req",
"mem_req": "jt.mem_req",
"timelimit": "jt.timelimit",
"memory_kb": "jt.memory_kb",
"completed": "jt.completed"
},
"joins": [
{
"name": "job_tasks",
"schema": "${SOURCE_SCHEMA}",
"alias": "jt"
},
{
"name": "job_records",
"schema": "${DESTINATION_SCHEMA}",
"alias": "jr",
"on": "jr.job_record_origin_id = jt.job_record_id AND jr.federation_instance_id = ${instance_id}"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "organization",
"alias": "destorg",
"on": "destorg.federation_instance_id = ${instance_id}"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "resourcefact",
"alias": "rf",
"on": "jt.resource_id = rf.resource_origin_id AND rf.`organization_id` = destorg.id AND destorg.`federation_instance_id` = ${instance_id} "
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "person",
"alias": "pf",
"on": "pf.person_origin_id = jt.person_id AND pf.organization_id = (SELECT id FROM ${DESTINATION_SCHEMA}.organization WHERE jt.person_organization_id = organization_origin_id AND federation_instance_id = ${instance_id})"
}
],
"macros": [
{
"$ref": "etl_macros.d/federated/federated.json#/getInstanceId"
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"table_definition": {
"$ref": "${table_definition_dir}/jobs/xdw/job-tasks.json#/table_definition"
},
"source_query": {
"records": {
"job_record_id": "jt.job_record_id",
"job_id": "jt.job_id",
"job_id_origin_id": "jt.job_id_origin_id",
"creation_time": "jt.creation_time",
"local_jobid": "jt.local_jobid",
"local_job_array_index": "jt.local_job_array_index",
"local_job_id_raw": "jt.local_job_id_raw",
"resource_id": "jt.resource_id",
"systemaccount_id": "jt.systemaccount_id",
"person_id": "jt.person_id",
"person_organization_id": "jt.person_organization_id",
"person_nsfstatuscode_id": "jt.person_nsfstatuscode_id",
"job_task_type_id": "jt.job_task_type_id",
"name": "jt.name",
"submit_time_ts": "jt.submit_time_ts",
"start_time_ts": "jt.start_time_ts",
"end_time_ts": "jt.end_time_ts",
"eligible_time_ts": "jt.eligible_time_ts",
"start_day_id": "jt.start_day_id",
"end_day_id": "jt.end_day_id",
"eligible_day_id": "jt.eligible_day_id",
"node_count": "jt.node_count",
"processor_count": "jt.processor_count",
"gpu_count": "jt.gpu_count",
"memory_kb": "jt.memory_kb",
"wallduration": "jt.wallduration",
"waitduration": "jt.waitduration",
"cpu_time": "jt.cpu_time",
"gpu_time": "jt.gpu_time",
"local_charge_su": "jt.local_charge_su",
"adjusted_charge_su": "jt.adjusted_charge_su",
"local_charge_xdsu": "jt.local_charge_xdsu",
"adjusted_charge_xdsu": "jt.adjusted_charge_xdsu",
"local_charge_nu": "jt.local_charge_nu",
"adjusted_charge_nu": "jt.adjusted_charge_nu",
"group_name": "jt.group_name",
"gid_number": "jt.gid_number",
"uid_number": "jt.uid_number",
"exit_code": "jt.exit_code",
"exit_state": "jt.exit_state",
"cpu_req": "jt.cpu_req",
"mem_req": "jt.mem_req",
"timelimit": "jt.timelimit",
"coversion_factor": "jt.conversion_factor",
"completed": "jt.completed",
"last_modified": "jt.last_modified",
"is_deleted": "jt.is_deleted"
},
"joins": [
{
"name": "job_tasks_staging",
"schema": "${SOURCE_SCHEMA}",
"alias": "jt"
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{
"table_definition": {
"$ref": "${table_definition_dir}/jobs/xdw/jobhosts.json#/table_definition"
},
"source_query": {
"overseer_restrictions": {
"last_modified_start_date": "jt.last_modified >= ${VALUE}",
"last_modified_end_date": "jt.last_modified <= ${VALUE}"
},
"records": {
"job_id": "djt.job_id",
"host_id": "dh.id",
"order_id": "sjh.order_id"
},
"joins": [
{
"schema": "${SOURCE_SCHEMA}",
"name": "job_tasks",
"alias": "jt"
},
{
"schema": "${SOURCE_SCHEMA}",
"name": "jobhosts",
"alias": "sjh",
"on": "jt.job_id = sjh.job_id"
},
{
"schema": "${SOURCE_SCHEMA}",
"name": "hosts",
"alias": "sh",
"on": "sjh.host_id = sh.id"
},
{
"schema": "${SOURCE_SCHEMA}",
"name": "resourcefact",
"alias": "sr",
"on": "sh.resource_id = sr.id"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "organization",
"alias": "do",
"on": "do.organization_origin_id = sr.organization_id AND federation_instance_id = ${instance_id}"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "resourcefact",
"alias": "drf",
"on": "drf.organization_id = do.id and drf.resource_origin_id = sh.resource_id"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "job_tasks",
"alias": "djt",
"on": "djt.job_id_origin_id = sjh.job_id AND djt.resource_id = drf.id"
},
{
"schema": "${DESTINATION_SCHEMA}",
"name": "hosts",
"alias": "dh",
"on": "dh.resource_id = drf.id AND sh.id = dh.host_origin_id"
}
],
"macros": [
{
"$ref": "etl_macros.d/federated/federated.json#/getInstanceId"
}
]
}
}
Loading

0 comments on commit 151a519

Please sign in to comment.