-
Notifications
You must be signed in to change notification settings - Fork 1
feat: support parallel dataset processor services #220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request introduces parallel dataset processing infrastructure to replace the previous composer-based architecture. The changes enable distributed dataset generation across multiple processor services, standardize dataset type handling, and add configuration and messaging support for the new processing model.
- Replaces composer architecture with distributed dataset processors - Removes old
ComposerFactory
and composer classes, introducingDatasetProcessor
services that can run in parallel - Standardizes dataset type handling - Introduces
DatasetType
enum to replaceComposerType
, with improved validation between dataset types and file configurations - Adds parallel processing infrastructure - Implements job distribution, result aggregation, and ZMQ communication channels for dataset generation tasks
Reviewed Changes
Copilot reviewed 30 out of 30 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
aiperf/dataset/processor.py |
New DatasetProcessor service that handles distributed dataset generation with support for synthetic, custom, and trace datasets |
aiperf/dataset/dataset_manager.py |
Updated to distribute dataset generation jobs across multiple processors and aggregate results |
aiperf/common/enums/dataset_enums.py |
Replaces ComposerType with DatasetType enum for clearer dataset categorization |
aiperf/common/config/input_config.py |
Adds dataset_type parameter with validation logic for dataset type and file combinations |
aiperf/common/messages/dataset_messages.py |
New message types for dataset processing communication between manager and processors |
aiperf/common/config/zmq_config.py |
Adds dataset job and result communication addresses/ports for ZMQ messaging |
tests/services/test_dataset_processor.py |
New test suite for dataset processor service functionality |
Comments suppressed due to low confidence (3)
aiperf/common/config/zmq_config.py:258
- The
dataset_job_address
property is missing from theZMQTCPConfig
class. This property is declared as abstract in the base class and implemented inZMQIPCConfig
but not inZMQTCPConfig
.
"""Get the credit return address based on protocol configuration."""
aiperf/common/config/zmq_config.py:269
- The
dataset_result_address
property is missing from theZMQTCPConfig
class. This property is declared as abstract in the base class and implemented inZMQIPCConfig
but not inZMQTCPConfig
.
return f"ipc://{self.path}/dataset_result.ipc"
aiperf/dataset/processor.py:315
- The
max_tokens
field is being set directly instead of using the_set_max_tokens
method. This bypasses the configuration-based token sampling logic and could lead to inconsistent behavior compared to other dataset types.
conversation.turns.append(turn)
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
# composer = SyntheticDatasetComposer(audio_config, mock_tokenizer) | ||
# | ||
# assert composer.config.input.audio.length.mean == 2 | ||
# assert composer.include_image is False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entire test file is commented out with a TODO note. This leaves the synthetic dataset processing functionality without proper test coverage, which could lead to undetected regressions.
# assert composer.include_image is False | |
class TestSyntheticDatasetComposer: | |
# ============================================================================ | |
# Initialization Tests | |
# ============================================================================ | |
def test_initialization_basic_config(self, synthetic_config, mock_tokenizer): | |
"""Test that SyntheticDatasetComposer can be instantiated with basic config.""" | |
composer = SyntheticDatasetComposer(synthetic_config, mock_tokenizer) | |
assert composer.config == synthetic_config | |
assert composer.config.input.conversation.num == 5 | |
assert composer.prompt_generator is not None | |
assert composer.include_image is False | |
assert composer.include_audio is False | |
def test_initialization_with_images(self, image_config, mock_tokenizer): | |
"""Test initialization with image generation enabled.""" | |
composer = SyntheticDatasetComposer(image_config, mock_tokenizer) | |
assert composer.config.input.image.width.mean == 10 | |
assert composer.config.input.image.height.mean == 10 | |
assert composer.include_image is True | |
assert composer.include_audio is False | |
def test_initialization_with_audio(self, audio_config, mock_tokenizer): | |
"""Test initialization with audio generation enabled.""" | |
composer = SyntheticDatasetComposer(audio_config, mock_tokenizer) | |
assert composer.config.input.audio.length.mean == 2 | |
# assert composer.include_image is False |
Copilot uses AI. Check for mistakes.
self.prompt_generator = prompt_generator | ||
self.user_config = user_config | ||
self._skipped_traces = 0 | ||
def __init__(self, user_config: UserConfig, **kwargs) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor signature changed to remove specific parameters like prompt_generator
, but the class may still need access to these components. Consider if this breaks the loader's functionality or if these dependencies should be injected differently.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ajcasagrande noticed this test being non-deterministic a while ago, and I made the fix in this PR.
I will have a separate follow-up PR that covers migrating and adding new tests, and refactoring some of the components in the dataset processor. Some unit tests for custom dataset will fail because the functionality moved to dataset processor. |
Converted to draft to address one important issue pointed out by @ajcasagrande. With distributed dataset generation, we need to figure out how to create, maintain, and fetch from cached token blocks across multiple processes. |
Summary
This pull request introduces new infrastructure for dataset processing, standardizes dataset type handling, and adds new messaging and configuration options to support dataset processors. The changes lay the groundwork for scalable, configurable dataset generation and processing services within the system. Key updates include new enums, message types, configuration fields, and removal of the old composer abstraction.
Dataset Processing Infrastructure:
DatasetProcessor
service type and related configuration (dataset_processor_service_count
) to enable scalable dataset processing as a first-class service.ProcessSyntheticDatasetMessage
,ProcessDatasetResponseMessage
) to support communication between components involved in dataset generation and processing.Configuration and CLI Enhancements:
dataset_type
configuration and CLI parameter, with validation logic to ensure correct dataset type and file/custom type combinations.Enums and API Cleanup:
DatasetType
(replacingComposerType
), updating imports, and cleaning up references throughout the codebase.ComposerFactory
and related composer imports, reflecting the move away from the composer abstraction in favor of the new dataset processor model.Command and Message Types:
These changes collectively enable more flexible and robust dataset processing, with improved configuration, clearer API boundaries, and better support for scaling dataset generation to meet system demands.