Skip to content

Commit

Permalink
duplicate task check on unexpired and ongoing/all tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
dchhabda committed Feb 18, 2025
1 parent edc8432 commit 7e41863
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
16 changes: 11 additions & 5 deletions pybossa/api/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def _update_attribute(self, new, old):

def _preprocess_post_data(self, data):
project_id = data["project_id"]
project = project_repo.get(project_id)
if not project:
raise NotFound(f'Non existing project id {project_id}')

info = data["info"]
if isinstance(info, dict):
hdfs_task = any([val.startswith("/fileproxy/hdfs/") for val in info.values() if isinstance(val, str)])
Expand All @@ -97,9 +101,14 @@ def _preprocess_post_data(self, data):
except Exception as e:
current_app.logger.info("Project %d. Error generating duplicate task checksum %s", project_id, str(e))
raise BadRequest(str(e))

data["dup_checksum"] = dup_checksum
duplicate = task_repo.find_duplicate(project_id=project_id, info=info, dup_checksum=dup_checksum)
completed_tasks = project.info.get("duplicate_task_check", {}).get("completed_tasks", False)
duplicate = task_repo.find_duplicate(
project_id=project_id,
info=info,
dup_checksum=dup_checksum,
completed_tasks=completed_tasks
)
if duplicate:
current_app.logger.info("Project %d, task checksum %s. Duplicate task found with task id %d. Ignoring task creation", project_id, dup_checksum, duplicate)
message = {
Expand All @@ -110,9 +119,6 @@ def _preprocess_post_data(self, data):


if 'n_answers' not in data:
project = project_repo.get(project_id)
if not project:
raise NotFound(f'Non existing project id {project_id}')
data['n_answers'] = project.get_default_n_answers()
user_pref = data.get('user_pref', {})
if user_pref.get('languages'):
Expand Down
10 changes: 8 additions & 2 deletions pybossa/importers/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,14 @@ def create_tasks(self, task_repo, project, importer=None, **form_data):
num = 0
importer = importer or self._create_importer_for(**form_data)
tasks = importer.tasks()
total_tasks_count = len(tasks) if isinstance(tasks, list) else 0
header_report = self._validate_headers(importer, project, **form_data)
if header_report:
return header_report
msg = ''
validator = TaskImportValidator(get_enrichment_output_fields(project))
n_answers = project.get_default_n_answers()
completed_tasks = project.info.get("duplicate_task_check", {}).get("completed_tasks", False)
try:
for task_data in tasks:
# As tasks are getting created, pass current date as create_date
Expand All @@ -233,8 +235,10 @@ def create_tasks(self, task_repo, project, importer=None, **form_data):
set_gold_answers(task, gold_answers)

found = task_repo.find_duplicate(project_id=project.id,
info=task.info,
dup_checksum=task.dup_checksum)
info=task.info,
dup_checksum=task.dup_checksum,
completed_tasks=completed_tasks
)
if found is not None:
current_app.logger.info("Project %d, task checksum %s. Duplicate task found with task id %d", project.id, task.dup_checksum, found)
continue
Expand All @@ -260,6 +264,8 @@ def create_tasks(self, task_repo, project, importer=None, **form_data):
msg = str(num) + " " + gettext('new task was imported successfully. ')
else:
msg = str(num) + " " + gettext('new tasks were imported successfully. ')
if num > 0 and num < total_tasks_count:
msg += str(total_tasks_count - num) + " " + gettext('tasks not imported. ')
msg += str(validator)
if data_access_levels and 'data_access' in importer.headers():
msg += gettext('Task data_access column will not impact data classification. This is done at project level only.')
Expand Down
8 changes: 5 additions & 3 deletions pybossa/repositories/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ def update_priority(self, project_id, priority, filters):
self.db.session.commit()
cached_projects.clean_project(project_id)

def find_duplicate(self, project_id, info, dup_checksum=None):
def find_duplicate(self, project_id, info, dup_checksum=None, completed_tasks=False):
"""
Find a task id in the given project with the project info using md5
index on info column casted as text. Md5 is used to avoid key size
Expand All @@ -504,12 +504,14 @@ def find_duplicate(self, project_id, info, dup_checksum=None):
# with task payload containing dup_checksum value, perform duplicate
# check based on checkum instead of comparing entire task payload info
if dup_checksum:
task_state_cond = "AND task.state='ongoing'" if not completed_tasks else ""
sql = text('''
SELECT task.id as task_id
FROM task
WHERE task.project_id=:project_id
AND task.dup_checksum=:dup_checksum
''')
AND task.expiration > (now() at time zone 'utc')::timestamp
{};'''.format(task_state_cond))
row = self.db.session.execute(
sql, dict(project_id=project_id, dup_checksum=dup_checksum)).first()
else:
Expand All @@ -518,7 +520,7 @@ def find_duplicate(self, project_id, info, dup_checksum=None):
FROM task
WHERE task.project_id=:project_id
AND task.state='ongoing'
AND md5(task.info::text)=md5(((:info)::jsonb)::text)
AND md5(task.info::text)=md5(((:info)::jsonb)::text);
''')
info = json.dumps(info, allow_nan=False)
row = self.db.session.execute(
Expand Down

0 comments on commit 7e41863

Please sign in to comment.