Skip to content

Commit

Permalink
feat(taskworker) Add concurrent worker (#83254)
Browse files Browse the repository at this point in the history
Move the taskworker process to be a multiprocess concurrent worker. This
will help enable higher CPU usage in worker pods, as we can pack more
concurrent CPU operations into each pod (at the cost of memory).

The main process is responsible for:

- Spawning children
- Making RPC requests to fill child queues and submit results.

Each child process handles:

- Resolving task names
- Checking at_most_once keys
- Enforcing processing deadlines
- Executing task functions

Instead of using more child processes to enforce timeouts, I've used
SIGALRM. I've verified that tasks like

```python
@exampletasks.register(name="examples.infinite", retry=Retry(times=2))
def infinite_task() -> None:
    try:
        while True:
            pass
    except Exception as e:
        print("haha caught exception", e)
```

Do not paralyze workers with infinite loops. 

When a worker is terminated, it uses an `Event` to have children exit,
and then drains any results. If there are tasks in the `_child_tasks`
queue will not be completed, and instead will sent to another worker
when the `processing_deadline` on the activations expires.

---------

Co-authored-by: Evan Hicks <evanh@users.noreply.github.com>
  • Loading branch information
2 people authored and andrewshie-sentry committed Jan 22, 2025
1 parent 45b7c2b commit a6f7771
Show file tree
Hide file tree
Showing 4 changed files with 418 additions and 280 deletions.
9 changes: 7 additions & 2 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def worker(ignore_unknown_queues: bool, **options: Any) -> None:
@click.option(
"--max-task-count", help="Number of tasks this worker should run before exiting", default=10000
)
@click.option("--concurrency", help="Number of child worker processes to create.", default=1)
@click.option(
"--namespace", help="The dedicated task namespace that taskworker operates on", default=None
)
Expand All @@ -258,7 +259,7 @@ def taskworker(**options: Any) -> None:


def run_taskworker(
rpc_host: str, max_task_count: int, namespace: str | None, **options: Any
rpc_host: str, max_task_count: int, namespace: str | None, concurrency: int, **options: Any
) -> None:
"""
taskworker factory that can be reloaded
Expand All @@ -267,7 +268,11 @@ def run_taskworker(

with managed_bgtasks(role="taskworker"):
worker = TaskWorker(
rpc_host=rpc_host, max_task_count=max_task_count, namespace=namespace, **options
rpc_host=rpc_host,
max_task_count=max_task_count,
namespace=namespace,
concurrency=concurrency,
**options,
)
exitcode = worker.start()
raise SystemExit(exitcode)
Expand Down
5 changes: 4 additions & 1 deletion src/sentry/taskworker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ def get_task(self, namespace: str | None = None) -> TaskActivation | None:
return None

def update_task(
self, task_id: str, status: TaskActivationStatus.ValueType, fetch_next_task: FetchNextTask
self,
task_id: str,
status: TaskActivationStatus.ValueType,
fetch_next_task: FetchNextTask | None = None,
) -> TaskActivation | None:
"""
Update the status for a given task activation.
Expand Down
Loading

0 comments on commit a6f7771

Please sign in to comment.