Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pacer free document command to avoid high memory usage #4472

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,32 @@ def get_pdfs(
throttle = CeleryThrottle(queue_name=q)
completed = 0
cycle_checker = CycleChecker()
current_court = None
prev_court = None
for row in rows.iterator():
# Wait until the queue is short enough
throttle.maybe_wait()

# Keep track of current and previous processed court
prev_court = current_court
current_court = row.court_id

if cycle_checker.check_if_cycled(row.court_id):
print(
f"Court cycle completed. Sleep 1 second before starting the next cycle."
if prev_court != current_court:
# We are cycling different courts, wait 1s before start next cycle
sleep = 1
else:
# We are cycling the same court over and over again, waiting longer
# before queuing up more items from the same court
sleep = 3

logger.info(
f"Court cycle completed for: {row.court_id}. Current iteration: {cycle_checker.current_iteration}. Sleep {sleep} second(s) "
f"before starting the next cycle."
)
time.sleep(1)
time.sleep(sleep)
quevon24 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we talked about this, we could improve the sleep value here based on the number of courts being cycled through to ensure we don't surpass the scrape rate of 1/4s per court we had previously via the throttle_task decorator. We could consider the time it takes to process and download a document, then compute a dynamic value or threshold based on the number of courts being processed. This way, even when only a few courts remain in the list, we still maintain the 1/4s per court rate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1/4s per court rate

Is that 0.25s per court or am I misunderstanding?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's 1 task every 4 seconds per court according to get_task_wait docstrings

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, duh, thank you. Um, so if sleep is set to four seconds, we'd do each court at most every four seconds, right? But if we use some timing info, we can set this dynamically so that we sleep exactly four seconds for each loop? Like, if downloads take 2s, then we set the sleep for 2s, and boom, we 4s is achieved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like, if downloads take 2s, then we set the sleep for 2s, and boom, we 4s is achieved?

Yeah, that's right. I think Kevin already has some timing info we can use here. The other scenario we need to consider is when the number of courts with remaining documents to scrape is reduced.

ca1
ca2
ca3
ca1
ca2
ca3
...

In this case, with the current approach, we would schedule one task per court per second, which exceeds the 1/4-second rate per court. So the idea is to consider the number of courts in the last cycle and the average time to process a document to compute the sleep time for that cycle, ensuring the rate for these courts stays below 1/4 second.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Sounds great!


logger.info(f"Processing row id: {row.id} from {row.court_id}")
c = chain(
process_free_opinion_result.si(
row.pk,
Expand Down
2 changes: 0 additions & 2 deletions cl/corpus_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ def get_and_save_free_document_report(


@app.task(bind=True, max_retries=5, ignore_result=True)
@throttle_task("1/4s", key="court_id")
def process_free_opinion_result(
self,
row_pk: int,
Expand Down Expand Up @@ -595,7 +594,6 @@ def process_free_opinion_result(
interval_step=5,
ignore_result=True,
)
@throttle_task("1/6s", key="court_id")
def get_and_process_free_pdf(
self: Task,
data: TaskData,
Expand Down
Loading