Skip to content

Commit

Permalink
Merge pull request #125 from GSA/datasets-purge
Browse files Browse the repository at this point in the history
Datasets purge from Ckan and harvestdb
  • Loading branch information
Jin-Sun-tts authored Jan 14, 2025
2 parents 6b4aeb4 + ed74f55 commit 1718c3a
Showing 10 changed files with 57 additions and 136 deletions.
22 changes: 4 additions & 18 deletions app/routes.py
Original file line number Diff line number Diff line change
@@ -472,7 +472,7 @@ def view_harvest_source_data(source_id: str):
count=True,
skip_pagination=True,
source_id=source.id,
facets=["ckan_id != null"],
facets=["ckan_id is not null"],
)
error_records_count = db.get_harvest_records_by_source(
count=True,
@@ -589,20 +589,6 @@ def edit_harvest_source(source_id: str):
return db._to_dict(source)


# Clear Source
@mod.route("/harvest_source/config/clear/<source_id>", methods=["POST"])
@login_required
def clear_harvest_source(source_id):
try:
result = db.clear_harvest_source(source_id)
flash(result)
return {"message": "success"}
except Exception as e:
logger.error(f"Failed to clear harvest source :: {repr(e)}")
flash("Failed to clear harvest source")
return {"message": "failed"}


# Delete Source
@mod.route("/harvest_source/config/delete/<source_id>", methods=["POST"])
@login_required
@@ -618,9 +604,9 @@ def delete_harvest_source(source_id):


### Trigger Harvest
@mod.route("/harvest_source/harvest/<source_id>", methods=["GET"])
def trigger_harvest_source(source_id):
message = load_manager.trigger_manual_job(source_id)
@mod.route("/harvest_source/harvest/<source_id>/<job_type>", methods=["GET"])
def trigger_harvest_source(source_id, job_type):
message = load_manager.trigger_manual_job(source_id, job_type)
flash(message)
return redirect(f"/harvest_source/{source_id}")

6 changes: 4 additions & 2 deletions app/templates/view_data.html
Original file line number Diff line number Diff line change
@@ -27,8 +27,10 @@ <h2>{{action}} {{data_type}}</h2>
<a href="{{ url_for('harvest.trigger_harvest_source', source_id=source_id)}}">
<button class="btn btn-secondary">Harvest</button>
</a>
<button class="btn btn-danger"
onclick="confirmAction('harvest source', 'clear', '/harvest_source/config/clear/{{source_id}}')">Clear</button>
<a href="{{ url_for('harvest.trigger_harvest_source', source_id=data.harvest_source.id, job_type='clear') }}"
onclick="return confirm('Are you sure you want to clear all datasets?');">
<button class="btn btn-primary">Clear</button>
</a>
<button class="btn btn-danger"
onclick="confirmAction('harvest source', 'delete', '/harvest_source/config/delete/{{source_id}}')">Delete</button>
{% elif org_id %}
13 changes: 10 additions & 3 deletions app/templates/view_source_data.html
Original file line number Diff line number Diff line change
@@ -40,13 +40,15 @@ <h2>Harvest Source Config Table</h2>
</a>
</li>
<li class="usa-button-group__item">
<a href="{{ url_for('harvest.trigger_harvest_source', source_id=data.harvest_source.id)}}">
<a href="{{ url_for('harvest.trigger_harvest_source', source_id=data.harvest_source.id, job_type='harvest')}}">
<button class="usa-button usa-button--base">Harvest</button>
</a>
</li>
<li class="usa-button-group__item">
<button class="usa-button usa-button--secondary"
onclick="confirmAction('clear', '/harvest_source/config/clear/{{data.harvest_source.id}}')">Clear</button>
<a href="{{ url_for('harvest.trigger_harvest_source', source_id=data.harvest_source.id, job_type='clear') }}"
onclick="return confirm('Are you sure you want to clear all datasets?');">
<button class="btn btn-primary">Clear</button>
</a>
</li>
<li class="usa-button-group__item">
<button class="usa-button usa-button--secondary"
@@ -57,6 +59,9 @@ <h2>Harvest Source Config Table</h2>
{% endif %}
<br>
<h2>Harvest Records: </h2>
{% if data.harvest_jobs and (data.harvest_jobs | selectattr('status', 'equalto', 'in_progress') | list | length > 0) %}
<span style="color: orange; font-weight: bold;">Active Job in Progress</span>
{% endif %}
<div class="config-table">
<table class="table">
<tr>
@@ -98,6 +103,7 @@ <h2>Harvest Jobs</h2>
<thead>
<tr>
<th data-sortable scope="col" role="columnheader">Id</th>
<th data-sortable scope="col" role="columnheader">Type</th>
<th data-sortable scope="col" role="columnheader">Status</th>
<th data-sortable scope="col" role="columnheader">Date Created</th>
<th data-sortable scope="col" role="columnheader">Date Finished</th>
@@ -113,6 +119,7 @@ <h2>Harvest Jobs</h2>
<tr>
<th scope="row"><a href="{{ url_for('harvest.get_harvest_job', job_id=job.id) }}">{{job.id}}</a>
</th>
<td data-sort-value={jobs.job_type}> {{job.job_type}}</td>
<td data-sort-value={jobs.date_created}> {{job.status}}</td>
<td data-sort-value={jobs.date_created}> {{job.date_created}}</td>
<td data-sort-value={jobs.date_finished}>{{job.date_finished}} </td>
122 changes: 17 additions & 105 deletions database/interface.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import logging
import os
import time
import uuid
from datetime import datetime, timezone
from functools import wraps

import ckanapi
from ckanapi import RemoteCKAN
from sqlalchemy import create_engine, func, inspect, or_, select, text
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import scoped_session, sessionmaker
@@ -190,113 +187,15 @@ def update_harvest_source(self, source_id, updates):
self.db.rollback()
return None

def clear_harvest_source(self, source_id):
"""
Clear all datasets related to a harvest source in CKAN, and clean up the
harvest_record and harvest_record_error tables.
:param source_id: ID of the harvest source to clear
"""

# delete all HarvestRecords and related HarvestRecordErrors
def _clear_harvest_records():
self.db.query(HarvestRecordError).filter(
HarvestRecordError.harvest_record_id.in_(
self.db.query(HarvestRecord.id).filter_by(
harvest_source_id=source_id
)
)
).delete(synchronize_session=False)
self.db.query(HarvestRecord).filter_by(harvest_source_id=source_id).delete()
self.db.commit()

source = self.db.get(HarvestSource, source_id)
if source is None:
return "Harvest source not found"

organization_id = source.organization_id

records = (
self.db.query(HarvestRecord).filter_by(harvest_source_id=source_id).all()
)

if not records:
return "Harvest source has no records to clear."

ckan_ids = [record.ckan_id for record in records if record.ckan_id is not None]
error_records = [record for record in records if record.status == "error"]
jobs_in_progress = self.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source.id, "status": "in_progress"}
)

# Ensure no jobs are in progress
if jobs_in_progress:
return (
"Error: A harvest job is currently in progress. "
"Cannot clear datasets."
)

# Ensure (error_records + ckan_ids) = total records
if len(error_records) + len(ckan_ids) != len(records):
return (
"Error: Not all records are either in an error state "
"or have a CKAN ID. Cannot proceed without clearing the dataset."
)

if not ckan_ids:
_clear_harvest_records()
return "Harvest source cleared successfully."

ckan = RemoteCKAN(os.getenv("CKAN_API_URL"), apikey=os.getenv("CKAN_API_TOKEN"))

result = ckan.action.package_search(fq=f"harvest_source_id:{source_id}")
ckan_datasets = result["count"]
start = datetime.now(timezone.utc)
retry_count = 0
retry_max = 20

# Retry loop to handle timeouts from cloud.gov and CKAN's Solr backend,
# ensuring datasets are cleared despite possible interruptions.
while ckan_datasets > 0 and retry_count < retry_max:
result = ckan.action.package_search(fq=f"harvest_source_id:{source_id}")
ckan_datasets = result["count"]
logger.info(
f"Attempt {retry_count + 1}: "
f"{ckan_datasets} datasets remaining in CKAN"
)
try:
ckan.action.bulk_update_delete(
datasets=ckan_ids, org_id=organization_id
)
except ckanapi.errors.CKANAPIError as api_err:
logger.error(f"CKAN API error: {api_err}")
except Exception as err:
logger.error(f"Error occurred: {err} \n error_type: {type(err)}")
return f"Error occurred: {err}"

retry_count += 1
time.sleep(5)

# If all datasets are deleted from CKAN, clear harvest records
if ckan_datasets == 0:
logger.info("All datasets cleared from CKAN, clearing harvest records.")
_clear_harvest_records()
logger.info(f"Total time: {datetime.now(timezone.utc) - start}")
return "Harvest source cleared successfully."
else:
fail_message = (
f"Harvest source clearance failed after {retry_count} "
f"attempts. {ckan_datasets} datasets still exist in CKAN."
)
logger.error(fail_message)
return fail_message

def delete_harvest_source(self, source_id):
source = self.db.get(HarvestSource, source_id)
if source is None:
return "Harvest source not found"

records = (
self.db.query(HarvestRecord).filter_by(harvest_source_id=source_id).all()
self.db.query(HarvestRecord).filter(
HarvestRecord.harvest_source_id==source_id,
HarvestRecord.ckan_id.isnot(None)).all()
)

if len(records) == 0:
@@ -503,6 +402,18 @@ def update_harvest_record(self, record_id, updates):
self.db.rollback()
return None

def delete_harvest_record(self, ckan_id):
records = self.db.query(HarvestRecord).filter_by(ckan_id=ckan_id).all()
# Log if there are multiple records with the same ckan_id
if len(records) > 1:
logger.warning(f"Multiple records found with ckan_id={ckan_id}")
if records:
for record in records:
self.db.delete(record)
self.db.commit()
return True
return False

def get_harvest_record(self, record_id):
return self.db.query(HarvestRecord).filter_by(id=record_id).first()

@@ -626,5 +537,6 @@ def get_harvest_records_by_job(self, job_id, facets=[], **kwargs):
return self.pget_harvest_records(filter=text(filter_string), **kwargs)

def get_harvest_records_by_source(self, source_id, facets=[], **kwargs):
filter_string = " AND ".join([f"harvest_source_id = '{source_id}'"] + facets)
filter_string = " AND ".join([f"harvest_source_id = '{source_id}'"]
+ ["action != 'delete'"]+ facets)
return self.pget_harvest_records(filter=text(filter_string), **kwargs)
1 change: 1 addition & 0 deletions database/models.py
Original file line number Diff line number Diff line change
@@ -89,6 +89,7 @@ class HarvestJob(db.Model):
nullable=False,
index=True,
)
job_type = db.Column(db.String(20), default="harvest")
date_created = db.Column(db.DateTime, index=True, default=func.now())
date_finished = db.Column(db.DateTime)
records_added = db.Column(db.Integer)
15 changes: 11 additions & 4 deletions harvester/harvest.py
Original file line number Diff line number Diff line change
@@ -211,10 +211,14 @@ def prepare_internal_data(self) -> None:
def prepare_external_data(self) -> None:
logger.info("retrieving and preparing external records.")
try:
job = self.db_interface.get_harvest_job(self.job_id)
if self.source_type == "document":
self.external_records_to_id_hash(
download_file(self.url, ".json")["dataset"]
)
if job.job_type == 'clear':
self.external_records_to_id_hash([])
else:
self.external_records_to_id_hash(
download_file(self.url, ".json")["dataset"]
)
if self.source_type == "waf":
# TODO
self.external_records_to_id_hash(download_waf(traverse_waf(self.url)))
@@ -307,7 +311,7 @@ def synchronize_records(self) -> None:
self.external_records[i].action = action
try:
self.external_records[i].delete_record()
self.external_records[i].update_self_in_db()
self.external_records[i].delete_self_in_db()
except Exception as e:
self.external_records[i].status = "error"
raise SynchronizeException(
@@ -650,6 +654,9 @@ def update_self_in_db(self) -> bool:
data,
)

def delete_self_in_db(self) -> bool:
self.harvest_source.db_interface.delete_harvest_record(self.ckan_id)

def ckanify_dcatus(self) -> None:
from harvester.utils.ckan_utils import ckanify_dcatus

3 changes: 2 additions & 1 deletion harvester/lib/load_manager.py
Original file line number Diff line number Diff line change
@@ -125,7 +125,7 @@ def schedule_next_job(self, source_id):
logger.info(message)
return message

def trigger_manual_job(self, source_id):
def trigger_manual_job(self, source_id, job_type='harvest'):
"""manual trigger harvest job,
takes a source_id"""
source = interface.get_harvest_source(source_id)
@@ -138,6 +138,7 @@ def trigger_manual_job(self, source_id):
{
"harvest_source_id": source.id,
"status": "new",
"job_type": job_type,
"date_created": datetime.now(),
}
)
8 changes: 6 additions & 2 deletions tests/integration/app/test_login_required.py
Original file line number Diff line number Diff line change
@@ -85,7 +85,9 @@ def test_harvest_data_edit_buttons__logged_in(
source_edit_text = (
f'<a href="/harvest_source/config/edit/{source_data_dcatus["id"]}"'
)
source_clear_text = f"onclick=\"confirmAction('clear', '/harvest_source/config/clear/{source_data_dcatus['id']}')"
source_clear_text = (
f'<a href="/harvest_source/harvest/{source_data_dcatus["id"]}/clear"'
)
source_delete_text = f"onclick=\"confirmAction('delete', '/harvest_source/config/delete/{source_data_dcatus['id']}')"
assert res.status_code == 200
assert res.text.find(button_string_text) != -1
@@ -103,7 +105,9 @@ def test_harvest_data_edit_buttons__logged_out(
source_edit_text = (
f'<a href="/harvest_source/config/edit/{source_data_dcatus["id"]}"'
)
source_clear_text = f"onclick=\"confirmAction('clear', '/harvest_source/config/clear/{source_data_dcatus['id']}')"
source_clear_text = (
f'<a href="/harvest_source/harvest/{source_data_dcatus["id"]}/clear"'
)
source_delete_text = f"onclick=\"confirmAction('delete', '/harvest_source/config/delete/{source_data_dcatus['id']}')"
assert res.status_code == 200
assert res.text.find(button_string_text) == -1
1 change: 1 addition & 0 deletions tests/integration/database/test_db.py
Original file line number Diff line number Diff line change
@@ -105,6 +105,7 @@ def test_delete_harvest_source(
# Add the harvest source again
source = interface.add_harvest_source(source_data_dcatus)
interface.add_harvest_job(job_data_dcatus)
record_data_dcatus[0]['ckan_id']=record_data_dcatus[0]['id']
interface.add_harvest_record(record_data_dcatus[0])

response = interface.delete_harvest_source(source.id)
2 changes: 1 addition & 1 deletion tests/integration/harvest/test_ckan_load.py
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ def test_sync(
assert created == 6
assert updated == 1
assert deleted == 1
assert succeeded == 8
assert succeeded == 7
assert errored == 0

def test_ckanify_dcatus(

1 comment on commit 1718c3a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests Skipped Failures Errors Time
2 0 💤 0 ❌ 0 🔥 6.122s ⏱️

Please sign in to comment.