Skip to content

Commit

Permalink
#1028 remove unneccessary packages
Browse files Browse the repository at this point in the history
  • Loading branch information
chmnata authored and gabrielwol committed Oct 7, 2024
1 parent 1d39add commit 2e69d5f
Showing 1 changed file with 16 additions and 32 deletions.
48 changes: 16 additions & 32 deletions gis/gccview/gcc_puller_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from psycopg2 import sql
from psycopg2.extras import execute_values
import logging
from time import sleep
from airflow.exceptions import AirflowFailException
import click
CONFIG = configparser.ConfigParser()

Expand Down Expand Up @@ -311,7 +309,6 @@ def get_data(mapserver, layer_id, max_number = None, record_max = None):
LOGGER.error("Invalid HTTP response: ", err_h)
except requests.exceptions.ConnectionError as err_c:
LOGGER.error("Network problem: ", err_c)
sleep(10)
except requests.exceptions.Timeout as err_t:
LOGGER.error("Timeout: ", err_t)
except requests.exceptions.RequestException as err:
Expand Down Expand Up @@ -613,38 +610,25 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
features = return_json['features']
record_max=(len(features))
max_number = record_max

if is_audited:
insert_audited_data(output_table, insert_column, return_json, schema_name, con)
else:
insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con)

counter += 1
keep_adding = find_limit(return_json)
if keep_adding == False:
LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table)
# Insert data into the table
if is_audited:
insert_audited_data(output_table, insert_column, return_json, schema_name, con)
else:
return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max)
if is_audited:
insert_audited_data(output_table, insert_column, return_json, schema_name, con)
else:
insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con)

counter += 1
keep_adding = find_limit(return_json)
if keep_adding == True:
max_number = max_number + record_max
else:
LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table)
insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con)
# Update loop variables
counter += 1
keep_adding = find_limit(return_json)

if keep_adding:
max_number += record_max
else:
LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table)

if is_audited:
successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con)

'''
# Raise error if UPSERT failed
if not successful_task_run:
raise AirflowFailException
'''
try:
successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con)
except Exception as err:
LOGGER.exception("Unable to update table %s", err)

@click.command()
@click.option('--mapserver', '-ms', type = int, required = True,
Expand Down

0 comments on commit 2e69d5f

Please sign in to comment.