Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Oct 19, 2024
1 parent 0fdae3b commit c682878
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
1 change: 1 addition & 0 deletions operators/manual_data_entry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def mde_prepare():
DF.filter_rows(lambda r: r.get('Org Id') != 'dummy'),
stats.filter_with_stat('Manual Data Entry: Entry not ready to publish', lambda r: r.get('Status') == 'בייצור'),
stats.filter_with_stat('Manual Data Entry: No Org ID or Org Name', lambda r: (r.get('Org Id') or r.get('Org Name')), report=unknown_entity_ids),
DF.add_field('_row_id', 'string', lambda r: r[AIRTABLE_ID_FIELD]),
)

data_sources = DF.Flow(
Expand Down
4 changes: 3 additions & 1 deletion operators/manual_data_entry/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ def func(rows):
filter_ready_to_publish(stats),
# DF.printer(),
).results()[0][0]
for service in services:
for i, service in enumerate(services):
emit = dict()
emit['_row_id'] = f'{URL}#{i}'
emit['Status'] = 'בייצור'
emit['Branch Address'] = service['כתובת או שם ישוב בו מסופק השירות'] or row.get('Branch Address')
emit['Branch Details'] = None
Expand Down Expand Up @@ -103,6 +104,7 @@ def func(rows):
DF.add_field('taxonomies', 'array'),
DF.add_field('target_audiences', 'string'),
DF.add_field('notes', 'string'),
DF.add_field('_row_id', 'string'),
func
)

Expand Down
32 changes: 21 additions & 11 deletions operators/manual_data_entry/mde_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def mde_organization_flow():
def mde_id(*args):
return 'mde:' + hasher(*map(str, args))

def mde_branch_id(row):
return mde_id(row['organization'], row['operating_unit'], row.get('address'), row.get('geocode'))

def branch_updater():
def func(row):
if not row.get('data'):
Expand All @@ -129,12 +132,16 @@ def func(row):
return func


def mde_branch_flow(source_id):
def mde_branch_flow(source_id, branch_ids):

def update_branch_ids(row):
branch_ids[row['_id']] = row['id']

branches = DF.Flow(
DF.checkpoint(CHECKPOINT),
DF.update_resource(-1, name='branches'),
DF.select_fields(['Org Id', 'Org Name', 'Org Short Name', 'Branch Details', 'Branch Address', 'Branch Geocode',
'Branch Phone Number', 'Branch Email', 'Branch Website']),
'Branch Phone Number', 'Branch Email', 'Branch Website', '_row_id']),
DF.rename_fields({
'Org Id': 'organization',
'Branch Details': 'name',
Expand All @@ -146,9 +153,10 @@ def mde_branch_flow(source_id):
}),
DF.add_field('operating_unit', 'string', lambda r: r.get('Org Short Name') or r.get('Org Name')),
DF.delete_fields(['Org Name', 'Org Short Name']),
DF.add_field('id', 'string', lambda r: mde_id(r['organization'], r['operating_unit'], r.get('address'), r.get('geocode'))),
DF.join_with_self('branches', ['id'], dict(
id=None,
DF.add_field('_id', 'string', lambda r: mde_branch_id(r)),
DF.join_with_self('branches', ['_id'], dict(
_id=None,
id=dict(name='row_id', aggregate='min'),
name=None,
operating_unit=None,
address=None,
Expand All @@ -158,6 +166,7 @@ def mde_branch_flow(source_id):
urls=None,
organization=None,
)),
update_branch_ids,
DF.add_field('data', 'object', lambda r: dict(
name=r['name'],
operating_unit=r['operating_unit'],
Expand Down Expand Up @@ -201,7 +210,7 @@ def func(row):

return func

def mde_service_flow(data_sources, source_id):
def mde_service_flow(data_sources, source_id, branch_ids):

# data_sources = DF.Flow(
# load_from_airtable(settings.AIRTABLE_DATAENTRY_BASE, 'DataReferences', settings.AIRTABLE_VIEW, settings.AIRTABLE_API_KEY),
Expand All @@ -213,7 +222,7 @@ def mde_service_flow(data_sources, source_id):
DF.update_resource(-1, name='services'),
DF.select_fields(['Org Id', 'Org Name', 'Org Short Name', 'Branch Address', 'Branch Geocode', 'Data Source',
'Service Name', 'Service Description', 'Service Conditions', 'Service Phone Number', 'Service Email', 'Service Website',
'responses_ids', 'situations_ids', 'target_audiences', 'notes']),
'responses_ids', 'situations_ids', 'target_audiences', 'notes', '_row_id']),
DF.rename_fields({
'Org Id': 'organization',
'Data Source': 'data_source',
Expand All @@ -227,7 +236,7 @@ def mde_service_flow(data_sources, source_id):
'Service Website': 'urls',
}),
DF.add_field('operating_unit', 'string', lambda r: r.get('Org Short Name') or r.get('Org Name')),
DF.add_field('branch_id', 'string', lambda r: mde_id(r['organization'], r['operating_unit'], r.get('branch_address'), r.get('branch_geocode'))),
DF.add_field('branch_id', 'string', lambda r: branch_ids[mde_branch_id(r)]),
DF.add_field('data', 'object', lambda r: dict(
name=r['name'],
description=r.get('description'),
Expand All @@ -243,7 +252,7 @@ def mde_service_flow(data_sources, source_id):
notes=r.get('notes'),
)),
handle_no_taxonomies(),
DF.add_field('id', 'string', lambda r: mde_id(r['branch_id'], r['name'])),
DF.add_field('id', 'string', lambda r: mde_id(r['branch_id'], r['_row_id'])),
DF.select_fields(['id', 'data']),
).results()[0][0]

Expand All @@ -267,6 +276,7 @@ def load_manual_data(source_flow, data_sources, source_id='manual-data-entry'):
).process()

mde_organization_flow()
mde_branch_flow(source_id)
mde_service_flow(data_sources, source_id)
branch_ids = {}
mde_branch_flow(source_id, branch_ids)
mde_service_flow(data_sources, source_id, branch_ids)

0 comments on commit c682878

Please sign in to comment.