Skip to content

Commit

Permalink
Merge pull request #134 from GSA/delete-old-record-versions
Browse files Browse the repository at this point in the history
add db clean script
  • Loading branch information
rshewitt authored Feb 10, 2025
2 parents 2b27e91 + bd15c3c commit bccea3f
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 47 deletions.
111 changes: 73 additions & 38 deletions database/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from datetime import datetime, timezone
from functools import wraps

from sqlalchemy import create_engine, func, inspect, select, text
from sqlalchemy import create_engine, desc, func, inspect, select, text
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm import aliased, scoped_session, sessionmaker

from harvester.utils.general_utils import query_filter_builder

Expand Down Expand Up @@ -439,57 +439,92 @@ def update_harvest_record(self, record_id, updates):
self.db.rollback()
return None

def delete_harvest_record(self, identifier):
records = self.db.query(HarvestRecord).filter_by(identifier=identifier).all()
if not records:
logger.warning(f"Harvest records with identifier {identifier} not found")
return
logger.info(
f"{len(records)} records with idenitifier {identifier}\
def delete_harvest_record(
self, identifier=None, record_id=None, harvest_source_id=None
):
try:
# delete all versions of the record within the given harvest source
if harvest_source_id is not None and identifier is not None:
records = (
self.db.query(HarvestRecord)
.filter_by(
identifier=identifier, harvest_source_id=harvest_source_id
)
.all()
)
# delete this exact one (used with cleaning)
if record_id is not None:
records = self.db.query(HarvestRecord).filter_by(id=record_id).all()
if len(records) == 0:
logger.warning(
f"Harvest records with identifier {identifier} or {record_id} "
"not found"
)
return
logger.info(
f"{len(records)} records with identifier {identifier} or {record_id}\
found in datagov-harvest-db"
)
for record in records:
self.db.delete(record)
self.db.commit()
return "Harvest record deleted successfully"
)
for record in records:
self.db.delete(record)
self.db.commit()
return "Harvest record deleted successfully"
except: # noqa E722
self.db.rollback()
return None

def get_harvest_record(self, record_id):
return self.db.query(HarvestRecord).filter_by(id=record_id).first()

def get_latest_harvest_records_by_source(self, source_id):
# datetimes are returned as datetime objs not strs
sql = text(
f"""SELECT * FROM (
SELECT DISTINCT ON (identifier) *
FROM harvest_record
WHERE status = 'success' AND harvest_source_id = '{source_id}'
ORDER BY identifier, date_created DESC ) sq
WHERE sq.action != 'delete';"""
def get_all_outdated_records(self, days=365):
"""
gets all outdated versions of records older than [days] ago
for all harvest sources. "outdated" simply means not the latest
or the opposite of 'get_latest_harvest_records_by_source'
"""

old_records_query = self.db.query(HarvestRecord).filter(
func.extract("days", (func.now() - HarvestRecord.date_created)) > days
)

res = self.db.execute(sql)
subq = (
self.db.query(HarvestRecord)
.filter(HarvestRecord.status == "success")
.order_by(
HarvestRecord.identifier,
HarvestRecord.harvest_source_id,
desc(HarvestRecord.date_created),
)
.distinct(HarvestRecord.identifier, HarvestRecord.harvest_source_id)
.subquery()
)

fields = list(res.keys())
records = res.fetchall()
sq_alias = aliased(HarvestRecord, subq)
latest_successful_records_query = self.db.query(sq_alias).filter(
sq_alias.action != "delete"
)

return [dict(zip(fields, record)) for record in records]
return old_records_query.except_all(latest_successful_records_query).all()

def get_all_latest_harvest_records_by_source(self, source_id):
"""used by follow-up job helper"""
def get_latest_harvest_records_by_source(self, source_id):
# datetimes are returned as datetime objs not strs
sql = text(
f"""SELECT DISTINCT ON (identifier) *
FROM harvest_record
WHERE harvest_source_id = '{source_id}'
ORDER BY identifier, date_created DESC"""
subq = (
self.db.query(HarvestRecord)
.filter(
HarvestRecord.status == "success",
HarvestRecord.harvest_source_id == source_id,
)
.order_by(HarvestRecord.identifier, desc(HarvestRecord.date_created))
.distinct(HarvestRecord.identifier)
.subquery()
)

res = self.db.execute(sql)
sq_alias = aliased(HarvestRecord, subq)
query = self.db.query(sq_alias).filter(sq_alias.action != "delete")

fields = list(res.keys())
records = res.fetchall()
records = query.all()

return [dict(zip(fields, record)) for record in records]
return self._to_dict(records)

def close(self):
if hasattr(self.db, "remove"):
Expand Down
10 changes: 7 additions & 3 deletions database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Base(DeclarativeBase):

class Error(db.Model):
__abstract__ = True
date_created = db.Column(db.DateTime, default=func.now())
date_created = db.Column(db.DateTime, default=func.statement_timestamp())
type = db.Column(db.String)
message = db.Column(db.String)

Expand Down Expand Up @@ -90,7 +90,9 @@ class HarvestJob(db.Model):
index=True,
)
job_type = db.Column(db.String(20), default="harvest")
date_created = db.Column(db.DateTime, index=True, default=func.now())
date_created = db.Column(
db.DateTime, index=True, default=func.statement_timestamp()
)
date_finished = db.Column(db.DateTime)
records_added = db.Column(db.Integer)
records_updated = db.Column(db.Integer)
Expand Down Expand Up @@ -118,7 +120,9 @@ class HarvestRecord(db.Model):
)
source_hash = db.Column(db.String)
source_raw = db.Column(db.String)
date_created = db.Column(db.DateTime, index=True, default=func.now())
date_created = db.Column(
db.DateTime, index=True, default=func.statement_timestamp()
)
date_finished = db.Column(db.DateTime, index=True)
ckan_id = db.Column(db.String, index=True)
ckan_name = db.Column(db.String, index=True)
Expand Down
14 changes: 9 additions & 5 deletions harvester/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,8 @@ def send_notification_emails(self, results: dict) -> None:
def sync_job_helper(self):
"""Kickstart a sync job where we're just syncing records"""
logger.info(f"starting sync job for {self.name}")
job = self.db_interface.get_first_harvest_job_by_filter(
{"harvest_source_id": self.id}
)
# get the job before this one to pick up where it left off
job = self.db_interface.get_harvest_jobs_by_source_id(self.id)[1]
results = {
"previous_job_results": {
"records_added": job.records_added,
Expand Down Expand Up @@ -509,7 +508,9 @@ def clear_helper(self):
self.records.extend(records)
logger.warning(f"{len(db_records)} uncleared incoming db records")
for record in db_records:
self.db_interface.delete_harvest_record(record.identifier)
self.db_interface.delete_harvest_record(
identifier=record.identifier, harvest_source_id=self.id
)

def make_record_contract(self, db_record):
"""Helper to hydrate a db record"""
Expand Down Expand Up @@ -807,7 +808,10 @@ def update_self_in_db(self) -> bool:
)

def delete_self_in_db(self) -> bool:
self.harvest_source.db_interface.delete_harvest_record(self.identifier)
if self.status == "success":
self.harvest_source.db_interface.delete_harvest_record(
identifier=self.identifier, harvest_source_id=self.harvest_source.id
)

def ckanify_dcatus(self) -> None:
from harvester.utils.ckan_utils import ckanify_dcatus
Expand Down
26 changes: 26 additions & 0 deletions scripts/clean-db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import argparse
import os
import sys

sys.path.insert(1, "/".join(os.path.realpath(__file__).split("/")[0:-2]))

from harvester import HarvesterDBInterface # noqa E402


def main(days):
interface = HarvesterDBInterface()
outdated_records = interface.get_all_outdated_records(days)
for record in outdated_records:
interface.delete_harvest_record(ID=record.id) # this cascades to errors
interface.close()


if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Harvest database cleaner",
description="cleans outdated harvest records and errors",
)
parser.add_argument("days", help="harvest records [days] old")
args = parser.parse_args(sys.argv[1:])

main(args.days)
12 changes: 11 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,18 @@ def latest_records(
"source_raw": "data_123",
"status": "success",
"action": "create",
# different harvest source and job
"harvest_source_id": source_data_dcatus_2["id"],
"harvest_job_id": job_data_dcatus_2["id"], #
"harvest_job_id": job_data_dcatus_2["id"],
},
{
"identifier": "f",
"date_created": "2024-04-04T00:00:00.001Z",
"source_raw": "data_123456",
"status": "success",
"action": "update",
"harvest_source_id": source_data_dcatus_2["id"],
"harvest_job_id": job_data_dcatus_2["id"],
},
]

Expand Down
64 changes: 64 additions & 0 deletions tests/integration/database/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,70 @@ def test_get_latest_harvest_records(
# make sure there aren't records that are different
assert not any(x != y for x, y in zip(latest_records, expected_records))

def test_delete_outdated_records_and_errors(
self,
interface,
organization_data,
source_data_dcatus,
source_data_dcatus_2,
job_data_dcatus,
job_data_dcatus_2,
latest_records,
):
interface.add_organization(organization_data)
interface.add_harvest_source(source_data_dcatus)
# another source for querying against. see last records in
# `latest_records` fixture
interface.add_harvest_source(source_data_dcatus_2)
interface.add_harvest_job(job_data_dcatus)
interface.add_harvest_job(job_data_dcatus_2)
records = [interface.add_harvest_record(record) for record in latest_records]

# only adding 1 record error for simplicity
# we have access to all record errors via relationship in HarvestRecord
error_data = {
"message": "record is invalid",
"type": "ValidationException",
"date_created": datetime.now(timezone.utc),
"harvest_record_id": records[-2].id,
}
interface.add_harvest_record_error(error_data)

latest_records_from_db1 = interface.get_latest_harvest_records_by_source(
source_data_dcatus["id"]
)

latest_records_from_db2 = interface.get_latest_harvest_records_by_source(
source_data_dcatus_2["id"]
)

outdated_records = interface.get_all_outdated_records(90)
assert len(outdated_records) == 7

all_records = (
len(latest_records_from_db1)
+ len(latest_records_from_db2)
+ len(outdated_records)
)

# latest records for all harvest sources (2) and all outdated records
# should be equal to the original fixture count
assert all_records == len(latest_records)

# we want outdated records for ALL harvest sources. this is harvest source 2
hs2_outdated = next(r for r in outdated_records if r.identifier == "f")
assert len(hs2_outdated.errors) == 1

for record in outdated_records:
interface.delete_harvest_record(ID=record.id)

# make sure only the outdated records and associated errors were deleted
db_records = interface.pget_harvest_records(count=True)
assert db_records == len(latest_records_from_db1) + len(latest_records_from_db2)

db_record_errors = interface.pget_harvest_record_errors(count=True)
assert db_record_errors == 0

def test_faceted_builder_queries(
self,
interface,
Expand Down

5 comments on commit bccea3f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Title Coverage Tests Skipped Failures Errors Time
Unit tests Coverage 39 0 💤 0 ❌ 0 🔥 0.987s ⏱️
Integration Tests Coverage 71 0 💤 2 ❌ 0 🔥 4.476s ⏱️
Functional Tests Coverage 2 0 💤 0 ❌ 0 🔥 8.845s ⏱️

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Title Coverage Tests Skipped Failures Errors Time
Unit tests Coverage 39 0 💤 0 ❌ 0 🔥 1.039s ⏱️
Integration Tests Coverage 71 0 💤 2 ❌ 0 🔥 4.64s ⏱️
Functional Tests Coverage 2 0 💤 0 ❌ 0 🔥 6.536s ⏱️

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Title Coverage Tests Skipped Failures Errors Time
Unit tests Coverage 39 0 💤 0 ❌ 0 🔥 1.027s ⏱️
Integration Tests Coverage 71 0 💤 2 ❌ 0 🔥 4.616s ⏱️
Functional Tests Coverage 2 0 💤 0 ❌ 0 🔥 8.211s ⏱️

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Title Coverage Tests Skipped Failures Errors Time
Unit tests Coverage 39 0 💤 0 ❌ 0 🔥 1.025s ⏱️
Integration Tests Coverage 71 0 💤 2 ❌ 0 🔥 4.709s ⏱️
Functional Tests Coverage 2 0 💤 0 ❌ 0 🔥 6.354s ⏱️

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Title Coverage Tests Skipped Failures Errors Time
Unit tests Coverage 39 0 💤 0 ❌ 0 🔥 1.029s ⏱️
Integration Tests Coverage 71 0 💤 2 ❌ 0 🔥 4.817s ⏱️
Functional Tests Coverage 2 0 💤 0 ❌ 0 🔥 7.928s ⏱️

Please sign in to comment.