Skip to content

Commit

Permalink
Fix dump to ckan
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 6, 2024
1 parent 2b46025 commit 79bb813
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions operators/derive/to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from srm_tools.logger import logger
from srm_tools.unwind import unwind

CHECKPOINT = 'to_es'

def card_score(row):
branch_count = row['organization_branch_count'] or 1
national_service = bool(row['national_service'])
Expand Down Expand Up @@ -45,8 +47,10 @@ def card_score(row):
return score

def data_api_es_flow():
checkpoint = f'{CHECKPOINT}/data_api_es_flow'
DF.Flow(
DF.load(f'{settings.DATA_DUMP_DIR}/card_data/datapackage.json'),
DF.update_package(title='Card Data', name='srm_card_data'),
DF.update_resource('card_data', name='cards'),
DF.add_field('score', 'number', card_score, resources=['cards']),
DF.set_type(
Expand Down Expand Up @@ -206,10 +210,10 @@ def data_api_es_flow():
dump_to_es_and_delete(
indexes=dict(srm__cards=[dict(resource_name='cards')]),
),
DF.checkpoint(checkpoint),
).process()
DF.Flow(
DF.load(f'{settings.DATA_DUMP_DIR}/card_data/datapackage.json'),
DF.update_package(title='Card Data', name='srm_card_data'),
DF.checkpoint(checkpoint),
DF.delete_fields([
'score', 'possible_autocomplete', 'situations', 'responses', 'collapse_key',
'responses_parents', 'situation_parents', 'situation_ids_parents', 'response_ids_parents',
Expand All @@ -220,10 +224,11 @@ def data_api_es_flow():
settings.CKAN_OWNER_ORG,
),
).process()

DF.Flow(
DF.load(f'{settings.DATA_DUMP_DIR}/card_data/datapackage.json', limit_rows=1),
DF.update_package(title='Card Data', name='srm_card_data'),
DF.checkpoint(checkpoint),
DF.update_resource(-1, name='cards_placeholder'),
DF.filter_rows(lambda r: False),
dump_to_ckan(
settings.CKAN_HOST,
settings.CKAN_API_KEY,
Expand Down Expand Up @@ -432,6 +437,8 @@ def load_autocomplete_to_es_flow():
).process()

def operator(*_):
shutil.rmtree(f'.checkpoints/{CHECKPOINT}', ignore_errors=True, onerror=None)

logger.info('Starting ES Flow')
data_api_es_flow()
load_locations_to_es_flow().process()
Expand Down

0 comments on commit 79bb813

Please sign in to comment.