Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(taskworker) Add concurrent worker #83254

Merged
merged 11 commits into from
Jan 17, 2025
Merged

Conversation

markstory
Copy link
Member

Move the taskworker process to be a multiprocess concurrent worker. This will help enable higher CPU usage in worker pods, as we can pack more concurrent CPU operations into each pod (at the cost of memory).

The main process is responsible for:

  • Spawning children
  • Making RPC requests to fill child queues and submit results.

Each child process handles:

  • Resolving task names
  • Checking at_most_once keys
  • Enforcing processing deadlines
  • Executing task functions

Instead of using more child processes to enforce timeouts, I've used SIGALRM. I've verified that tasks like

@exampletasks.register(name="examples.infinite", retry=Retry(times=2))
def infinite_task() -> None:
    try:
        while True:
            pass
    except Exception as e:
        print("haha caught exception", e)

Do not paralyze workers with infinite loops.

When a worker is terminated, it uses an Event to have children exit, and then drains any results. If there are tasks in the _child_tasks queue will not be completed, and instead will sent to another worker when the processing_deadline on the activations expires.

@markstory markstory requested a review from a team as a code owner January 10, 2025 19:57
@markstory markstory linked an issue Jan 10, 2025 that may be closed by this pull request
@github-actions github-actions bot added the Scope: Backend Automatically applied to PRs that change backend components label Jan 10, 2025
Copy link

codecov bot commented Jan 10, 2025

Codecov Report

Attention: Patch coverage is 87.78626% with 32 lines in your changes missing coverage. Please review.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/sentry/taskworker/worker.py 80.25% 31 Missing ⚠️
tests/sentry/taskworker/test_worker.py 99.04% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #83254      +/-   ##
==========================================
+ Coverage   87.49%   87.56%   +0.06%     
==========================================
  Files        9404     9393      -11     
  Lines      537180   536973     -207     
  Branches    21133    21048      -85     
==========================================
+ Hits       470024   470203     +179     
+ Misses      66798    66414     -384     
+ Partials      358      356       -2     

src/sentry/taskworker/worker.py Outdated Show resolved Hide resolved
task = self._get_known_task(activation)
if not task:
try:
activation = child_tasks.get_nowait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think get_nowait is necessarily correct here. I understand this is ensuring that the process doesn't block while waiting for a task before checking for the shutdown, but I think some kind of timeout/delay would good to here to avoid spiking the CPU. Maybe like 100ms or something like that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about the potential CPU burn on an empty queue. I'll put a blocking get with a timeout in.

src/sentry/taskworker/worker.py Show resolved Hide resolved
Co-authored-by: Evan Hicks <evanh@users.noreply.github.com>
@markstory markstory merged commit 2a26aed into master Jan 17, 2025
49 checks passed
@markstory markstory deleted the feat-taskworker-concurrent branch January 17, 2025 14:28
Copy link

sentry-io bot commented Jan 17, 2025

Suspect Issues

This pull request was deployed and Sentry observed the following issues:

  • ‼️ AssertionError: expected call not found. pytest.runtest.protocol tests/sentry/taskworker... View Issue

Did you find this useful? React with a 👍 or 👎

andrewshie-sentry pushed a commit that referenced this pull request Jan 22, 2025
Move the taskworker process to be a multiprocess concurrent worker. This
will help enable higher CPU usage in worker pods, as we can pack more
concurrent CPU operations into each pod (at the cost of memory).

The main process is responsible for:

- Spawning children
- Making RPC requests to fill child queues and submit results.

Each child process handles:

- Resolving task names
- Checking at_most_once keys
- Enforcing processing deadlines
- Executing task functions

Instead of using more child processes to enforce timeouts, I've used
SIGALRM. I've verified that tasks like

```python
@exampletasks.register(name="examples.infinite", retry=Retry(times=2))
def infinite_task() -> None:
    try:
        while True:
            pass
    except Exception as e:
        print("haha caught exception", e)
```

Do not paralyze workers with infinite loops. 

When a worker is terminated, it uses an `Event` to have children exit,
and then drains any results. If there are tasks in the `_child_tasks`
queue will not be completed, and instead will sent to another worker
when the `processing_deadline` on the activations expires.

---------

Co-authored-by: Evan Hicks <evanh@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Scope: Backend Automatically applied to PRs that change backend components
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make taskworker worker support concurrent work
2 participants