Skip to content

Commit

Permalink
#1028 Update comments for gcc puller functions and updae README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-oxu authored and gabrielwol committed Oct 7, 2024
1 parent 3e726f8 commit bbb376d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 27 deletions.
37 changes: 30 additions & 7 deletions gis/gccview/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,18 @@ The GCC pipeline will be pulling multiple layers into the `gis_core` and `gis` s

## Data Pipeline

The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `/dags/gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in five parameters. Here is a list that describes what each parameter means:
The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `/dags/gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in eight parameters. Here is a list that describes what each parameter means:

- mapserver_n (int): ID of the mapserver that host the desired layer
- layer_id (int): ID of the layer within the particular mapserver
- schema_name (string): name of destination schema
- is_audited (Boolean): True if the layer will be in a table that is audited, False if the layer will be inserted as a child table part of a parent table
- is_audited (Boolean): True if the layer will be in a table that is audited, False if the layer will be non-audited
- cred (Airflow PostgresHook): the Airflow PostgresHook that directs to credentials to enable a connection to a particular database
- con (used when manually pull): the path to the credential config file. Default is ~/db.cfg
- primary_key (used when pulling an audited table): primary key for this layer, returned from dictionary pk_dict when pulling for the Airflow DAG, set it manually when pulling a layer yourself.
- is_partitioned (Boolean): True if the layer will be inserted as a child table part of a parent table, False if the layer will be neither audited nor partitioned.

In the DAG file, the arguments for each layer are stored in dictionaries called "bigdata_layers" and "ptc_layers", in the order above. The DAG will be executed once every 3 months, particularly on the 15th of every March, June, September, and December every year.
In the DAG file, the arguments for each layer are stored in dictionaries called "bigdata_layers" and "ptc_layers", in the order above. The DAG will be executed once every 3 months, particularly on the 15th of every March, June, September, and December every year. The DAG will pull either audited table or partitioned table since the "is_partitioned" argument is not stored in dictionaries and are set to default value True.

## Adding new layers to GCC Puller DAG
1. Identify the mapserver_n and layer_id for the layer you wish to add. You can find COT transportation layers here: https://insideto-gis.toronto.ca/arcgis/rest/services/cot_geospatial2/FeatureServer, where mapserver_n is 2 and the layer_id is in brackets after the layer name.
Expand All @@ -97,7 +100,7 @@ In the DAG file, the arguments for each layer are stored in dictionaries called

## Manually fetch layers

If you need to pull a layer as a one-off task, this script allows you to pull any layer from the GCC Rest API. Please note that the script must be run locally or on a on-prem server as it needs connection to insideto.
If you need to pull a layer as a one-off task, `gcc_puller_functions.py` allows you to pull any layer from the GCC Rest API. Please note that the script must be run locally or on a on-prem server as it needs connection to insideto.

Before running the script, ensure that you have set up the appropriate environment with all necessary packages installed. You might have to set the `https_proxy` in your environment with your novell username and password in order to clone this repo or install packages. If you run into any issues, don't hestitate to ask a sysadmin. You can then install all packages in the `requirement.txt`, either with:
1) Activate your virtual environment, it should automatically install them for you
Expand All @@ -120,7 +123,7 @@ Before running the script, ensure that you have set up the appropriate environme

Now you are set to run the script!

There are 4 inputs that need to be entered.
There are 7 inputs that can be entered.

`--mapserver`: Mapserver number, e.g. cotgeospatial_2 will be 2

Expand All @@ -130,9 +133,29 @@ There are 4 inputs that need to be entered.

`--con`(optional): The path to the credential config file. Default is ~/db.cfg

Example of pulling the library layer to the gis schema.
`--is_audited`: Whether table will be audited or not, specify the option on the command line will set this option to True; while not specifying will give the default False.

`primary_key`(required when pulling an audited table): Primary key for the layer

`is_partitioned`: Whether table will be a child table of a parent table or with no feature, specify the option on the command line will set this option to True; while not specifying will give the default False.

Example of pulling the library layer (table with no feature) to the gis schema.


```python
python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg
```

Example of pulling the intersection layer (partitioned) to the gis_core schema.


```python
python gcc_layer_puller.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg
python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 12 --layer-id 42 --schema-name gis_core --con db.cfg --is-partitioned
```

Example of pulling the city_ward layer (partitioned) to the gis_core schema.


```python
python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 0 --layer-id 0 --schema-name gis_core --con db.cfg --is-audited --primary-key area_id
```
60 changes: 40 additions & 20 deletions gis/gccview/gcc_puller_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def get_tablename(mapserver, layer_id):
Parameters
-----------
mapserver: string
mapserver : string
The name of the mapserver we are accessing, returned from function mapserver_name
layer_id: integer
layer_id : integer
Unique layer id that represent a single layer in the mapserver
Returns
Expand Down Expand Up @@ -103,7 +103,7 @@ def create_audited_table(output_table, return_json, schema_name, primary_key, co
primary_key : string
Primary key for this layer, returned from dictionary pk_dict
con: Airflow Connection
con : Airflow Connection
Could be the connection to bigdata or to on-prem server
Returns
Expand Down Expand Up @@ -141,8 +141,8 @@ def create_audited_table(output_table, return_json, schema_name, primary_key, co
LOGGER.info(create_sql.as_string(con))
cur.execute(create_sql)

# owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name))
# cur.execute(owner_sql)
owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name))
cur.execute(owner_sql)

# Add a pk
with con:
Expand All @@ -166,7 +166,7 @@ def create_partitioned_table(output_table, return_json, schema_name, con):
schema_name : string
The schema in which the table will be inserted into
con: Airflow Connection
con : Airflow Connection
Could be the connection to bigdata or to on-prem server
Returns
Expand Down Expand Up @@ -206,7 +206,7 @@ def create_partitioned_table(output_table, return_json, schema_name, con):

def create_table(output_table, return_json, schema_name, con):
"""
Function to create a new table in postgresql for the layer (for audited tables only)
Function to create a new table in postgresql for the layer (for regular table)
Parameter
---------
Expand All @@ -219,19 +219,13 @@ def create_table(output_table, return_json, schema_name, con):
schema_name : string
The schema in which the table will be inserted into
primary_key : string
Primary key for this layer, returned from dictionary pk_dict
con: Connection
con : Connection
Could be the connection to bigdata or to on-prem server
Returns
--------
insert_columm : SQL composed
Composed object of column name and types use for creating a new postgresql table
excluded_column : SQL composed
Composed object that is similar to insert_column, but has 'EXCLUDED.' attached before each column name, used for UPSERT query
"""

fields = return_json['fields']
Expand Down Expand Up @@ -391,6 +385,29 @@ def find_limit(return_json):
return keep_adding

def insert_data(output_table, insert_column, return_json, schema_name, con, is_audited, is_partitioned):
"""
Function to insert data to our postgresql database
Parameters
----------
output_table : string
Table name for postgresql, returned from function get_tablename
insert_column : SQL composed
Composed object of column name and types use for creating a new postgresql table
return_json : json
Resulted json response from calling the api, returned from function get_data
schema_name : string
The schema in which the table will be inserted into
con : Airflow Connection
Could be the connection to bigdata or to on-prem server
is_audited : Boolean
Whether we want to have the table be audited (true) or be non-audited (false)
is_partitioned : Boolean
Whether we want to have the table be partitioned (true) or neither audited nor partitioned(false)
"""
rows = []
features = return_json['features']
fields = return_json['fields']
Expand Down Expand Up @@ -450,7 +467,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche
schema_name : string
The schema in which the table will be inserted into
con: Airflow Connection
con : Airflow Connection
Could be the connection to bigdata or to on-prem server
Returns
Expand Down Expand Up @@ -526,7 +543,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche
return successful_execution
#-------------------------------------------------------------------------------------------------------
# base main function, also compatible with Airflow
def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = False):
def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = True):
"""
This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database.
Expand All @@ -541,16 +558,19 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
schema_name : string
The schema in which the table will be inserted into
is_audited: Boolean
Whether we want to have the table be audited (true) or be partitioned (false)
is_audited : Boolean
Whether we want to have the table be audited (true) or be non-audited (false)
cred: Airflow PostgresHook
cred : Airflow PostgresHook
Contains credentials to enable a connection to a database
Expects a valid cred input when running Airflow DAG
con: connection to database
con : connection to database
Connection object that can connect to a particular database
Expects a valid con object if using command prompt
is_partitioned : Boolean
Whether we want to have the table be partitioned (true) or neither audited nor partitioned(false)
"""

# For Airflow DAG
Expand Down

0 comments on commit bbb376d

Please sign in to comment.