Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MultiProcessingStage #1878

Merged

Conversation

yczhang-nv
Copy link
Contributor

@yczhang-nv yczhang-nv commented Aug 29, 2024

Description

Implemented SharedProcessPool and MultiProcessingStage.

SharedProcessPool

  • A singleton process pool that manages tasks from multiple stages, each with its own queue. Allows setting stage-specific process usage to cap the number of workers a stage can utilize.
  • The total number of workers is determined by the environment variable SHARED_PROCESS_POOL_CPU_USAGE, which is multiplied by the system’s available CPU cores (os.sched_getaffinity(0)).
    • If the variable is not set, the default is 50% of available CPUs.
  • Accepts process function with arbitrary argument signatures.

MultiProcessingStage

  • A versatile base stage that uses SharedProcessPool to handle incoming messages in a multiprocessing manner.
  • Accepts processing functions with a single argument, capable of handling dynamic input and output types. The types are deducted at runtime based on the provided processing function.
  • Two ways to use the stage:
    • Extend MultiProcessingBaseStage and implement the data processing logic in the _on_data() method.
    • Use MultiProcessingStage.create() by passing in the processing function directly.
  • Able to run multiple instances of the stage in the same pipeline.

Closes #1850

By Submitting this PR I confirm:

  • I am familiar with the Contributing Guidelines.
  • When the PR is ready for review, new or existing tests cover these changes.
  • When the PR is ready for review, the documentation is up to date with these changes.

@yczhang-nv yczhang-nv requested review from a team as code owners August 29, 2024 23:34
@yczhang-nv yczhang-nv self-assigned this Aug 29, 2024
@yczhang-nv yczhang-nv added non-breaking Non-breaking change feature request New feature or request labels Aug 29, 2024
@yczhang-nv yczhang-nv marked this pull request as draft September 10, 2024 16:45
@yczhang-nv yczhang-nv marked this pull request as ready for review September 18, 2024 19:30
Copy link
Contributor

@mdemoret-nv mdemoret-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very close. Need a few tweaks to how stop/join works. Also, can you add more type annotations to this PR? For any public function, type annotations are helpful.

tests/test_multi_processing_stage.py Show resolved Hide resolved
python/morpheus/morpheus/utils/shared_process_pool.py Outdated Show resolved Hide resolved
tests/utils/test_shared_process_pool.py Outdated Show resolved Hide resolved
tests/utils/test_shared_process_pool.py Outdated Show resolved Hide resolved
python/morpheus/morpheus/utils/shared_process_pool.py Outdated Show resolved Hide resolved
python/morpheus/morpheus/utils/shared_process_pool.py Outdated Show resolved Hide resolved
python/morpheus/morpheus/utils/shared_process_pool.py Outdated Show resolved Hide resolved
Copy link
Contributor

@mdemoret-nv mdemoret-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better implementation with the condition variables. Looks good.

tests/utils/test_shared_process_pool.py Outdated Show resolved Hide resolved
python/morpheus/morpheus/utils/shared_process_pool.py Outdated Show resolved Hide resolved
@yczhang-nv
Copy link
Contributor Author

/merge

@rapids-bot rapids-bot bot merged commit fdfcbfc into nv-morpheus:branch-24.10 Sep 26, 2024
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request non-breaking Non-breaking change
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

[FEA]: Add Multi-Processing Stage
2 participants