diff --git a/cl/corpus_importer/management/commands/scrape_pacer_free_opinions.py b/cl/corpus_importer/management/commands/scrape_pacer_free_opinions.py index ddb5cbb8c0..a32fec149e 100644 --- a/cl/corpus_importer/management/commands/scrape_pacer_free_opinions.py +++ b/cl/corpus_importer/management/commands/scrape_pacer_free_opinions.py @@ -1,6 +1,7 @@ import argparse import datetime import inspect +import math import os import time from typing import Callable, Dict, List, Optional, cast @@ -321,11 +322,25 @@ def get_pdfs( throttle.maybe_wait() if cycle_checker.check_if_cycled(row.court_id): - print( - f"Court cycle completed. Sleep 1 second before starting the next cycle." + # How many courts we cycled in the previous cycle + cycled_items_count = cycle_checker.count_prev_iteration_courts + + # Update the queue size where the max number is close to the number + # of courts we did on the previous cycle, that way we can try to avoid + # having more than one item of each court of in the queue until it shortens + min_items = math.ceil(cycled_items_count / 2) + if min_items < 50: + # we set the limit to 50 to keep this number less than the defaults + # from the class to avoid having a lot of items + throttle.update_min_items(min_items) + + logger.info( + f"Court cycle completed for: {row.court_id}. Current iteration: {cycle_checker.current_iteration}. Sleep 2 seconds " + f"before starting the next cycle." ) - time.sleep(1) + time.sleep(2) + logger.info(f"Processing row id: {row.id} from {row.court_id}") c = chain( process_free_opinion_result.si( row.pk, @@ -450,7 +465,7 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: parser.add_argument( "--queue", type=str, - default="batch1", + default="pacerdoc1", help="The celery queue where the tasks should be processed.", ) parser.add_argument( diff --git a/cl/corpus_importer/tasks.py b/cl/corpus_importer/tasks.py index e00092b791..921937e2d3 100644 --- a/cl/corpus_importer/tasks.py +++ b/cl/corpus_importer/tasks.py @@ -397,8 +397,9 @@ def get_and_save_free_document_report( return PACERFreeDocumentLog.SCRAPE_FAILED raise self.retry(exc=exc, countdown=5) - if log_id: - # We only save the html when the script is run automatically every day + if log_id and not settings.DEVELOPMENT: + # We only save the html when the script is run automatically every day and + # not in development environment log = PACERFreeDocumentLog.objects.get(pk=log_id) if hasattr(report, "responses_with_params"): for result in report.responses_with_params: @@ -444,7 +445,6 @@ def get_and_save_free_document_report( @app.task(bind=True, max_retries=5, ignore_result=True) -@throttle_task("1/4s", key="court_id") def process_free_opinion_result( self, row_pk: int, @@ -595,7 +595,6 @@ def process_free_opinion_result( interval_step=5, ignore_result=True, ) -@throttle_task("1/6s", key="court_id") def get_and_process_free_pdf( self: Task, data: TaskData, diff --git a/cl/corpus_importer/utils.py b/cl/corpus_importer/utils.py index e97668f56e..efcab347a5 100644 --- a/cl/corpus_importer/utils.py +++ b/cl/corpus_importer/utils.py @@ -1145,6 +1145,8 @@ class CycleChecker: def __init__(self) -> None: self.court_counts: defaultdict = defaultdict(int) self.current_iteration: int = 1 + self.count_prev_iteration_courts: int = 0 + self.prev_iteration_courts: set = set() def check_if_cycled(self, court_id: str) -> bool: """Check if the cycle repeated @@ -1153,10 +1155,19 @@ def check_if_cycled(self, court_id: str) -> bool: :return True if the cycle started over, else False """ self.court_counts[court_id] += 1 + if self.court_counts[court_id] == self.current_iteration: + # Keep track of processed courts in last cycle + self.prev_iteration_courts.add(court_id) return False else: # Finished cycle and court has been seen more times than the # iteration count. Bump the iteration count and return True. self.current_iteration += 1 + self.count_prev_iteration_courts = len(self.prev_iteration_courts) + # Clear set of previous courts processed + self.prev_iteration_courts.clear() + # Add the first item for the new iteration, + # when self.court_counts[court_id] != self.current_iteration + self.prev_iteration_courts.add(court_id) return True