-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Instantiate one stream per thread #21
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThe changes introduced in this pull request enhance the Changes
Sequence Diagram(s)sequenceDiagram
participant Thread
participant ConcurrentDeclarativeSource
participant CursorPartitionGenerator
Thread->>ConcurrentDeclarativeSource: Request new stream instance
ConcurrentDeclarativeSource->>ConcurrentDeclarativeSource: Call _new_stream_instance_factory()
ConcurrentDeclarativeSource->>Thread: Return stream_factory
Thread->>CursorPartitionGenerator: Instantiate with stream_factory
CursorPartitionGenerator->>CursorPartitionGenerator: Generate stream using stream_factory()
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
unit_tests/sources/streams/concurrent/test_adapters.py (2)
Line range hint
471-477
: Consider adding thread-safety verificationWhile the test verifies basic functionality, it could be enhanced to explicitly verify that each thread gets a unique stream instance.
Consider adding a test case that:
- Creates multiple partitions concurrently
- Verifies each partition has a unique stream instance
- Ensures no stream instance is shared between partitions
Example addition:
def test_cursor_partition_generator_thread_safety(): stream_instances = set() stream_factory = Mock(side_effect=lambda: Mock(id=len(stream_instances))) # ... setup other dependencies ... partitions = list(partition_generator.generate()) # Verify each partition has a unique stream instance for partition in partitions: stream_instances.add(id(partition._stream)) assert len(stream_instances) == len(partitions), "Partitions should have unique stream instances"
Line range hint
478-485
: Enhance assertions for factory pattern verificationThe current assertions verify partition types and slice content, but could be extended to verify the factory pattern implementation.
Add assertions to verify:
- The stream_factory is called for each partition
- Each partition receives a new stream instance
# Add after existing assertions stream_factory.assert_has_calls([call() for _ in partitions])airbyte_cdk/sources/streams/concurrent/adapters.py (1)
403-407
: Remove reference to unused parameter in docstringThe docstring still references the
sync_mode
parameter which is no longer part of the method signature.- :param sync_mode: The synchronization mode.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(3 hunks)airbyte_cdk/sources/streams/concurrent/adapters.py
(4 hunks)unit_tests/sources/streams/concurrent/test_adapters.py
(2 hunks)
🔇 Additional comments (5)
unit_tests/sources/streams/concurrent/test_adapters.py (1)
451-451
: LGTM: Variable renamed to reflect factory pattern
The renaming from stream
to stream_factory
accurately reflects the architectural change to use a factory pattern for creating stream instances per thread.
airbyte_cdk/sources/streams/concurrent/adapters.py (3)
9-9
: LGTM: Import changes are appropriate
The addition of Callable
to the typing imports is necessary for type-hinting the new stream_factory
parameter.
393-393
: LGTM: Stream factory parameter change
The change from stream
to stream_factory: Callable[[], Stream]
aligns with the PR objective of instantiating one stream per thread. The instance variable is properly stored.
Also applies to: 408-408
448-448
: Verify thread safety of stream factory
The change to use self._stream_factory()
correctly ensures each partition gets its own stream instance. However, we should verify that:
- The stream factory itself is thread-safe
- The stream creation process doesn't have any shared state that could cause thread safety issues
✅ Verification successful
Let me analyze the review comment and the provided code context.
The review comment suggests verifying that the stream factory is thread-safe when using self._stream_factory()
instead of self._stream
in the generate
method.
Looking at the code, I can see that:
-
The stream factory is created in
ConcurrentDeclarativeSource._new_stream_instance_factory()
which creates a new stream instance per thread to avoid threads updating the same field for a specific instance. -
The factory is used in
CursorPartitionGenerator
to create new stream instances for each partition. -
The factory implementation ensures thread safety by:
- Creating a new stream instance for each call
- Using the source's
streams()
method which creates fresh instances - Not sharing any mutable state between instances
-
The code has been designed specifically to handle concurrent access by creating new stream instances rather than sharing them.
Based on this analysis, I can conclude that the stream factory implementation is thread-safe by design.
Stream factory is thread-safe
The change from self._stream
to self._stream_factory()
correctly ensures thread safety by:
- Creating a new stream instance for each partition
- Not sharing any mutable state between stream instances
- Using a factory pattern that creates fresh instances per thread
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any shared state or synchronization mechanisms in stream factory implementations
rg -l "(?:threading\.|Lock|RLock|Semaphore|Event|Condition|@synchronized|@lock)" | \
xargs -I {} sh -c 'echo "=== {} ==="; cat {}'
# Search for any static/class variables in stream implementations that could be shared
ast-grep --pattern 'class $CLASS {
$$$
@classmethod
def $METHOD($$$) {
$$$
}
$$$
}'
Length of output: 605931
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
6-6
: Approved: Import Callable
for type annotations
The addition of Callable
to the typing imports is appropriate for type hinting the new method that returns a factory function.
def _new_stream_instance_factory(self, stream_to_copy: Stream, config: Mapping[str, Any]) -> Callable[[], Stream]: | ||
""" | ||
Some of the declarative components are stateful. Therefore, we create one stream per thread in order to avoid threads updating | ||
the same field for a specific instance. | ||
""" | ||
def _create_new_stream() -> Stream: | ||
streams_with_same_name = list(filter(lambda stream: stream.name == stream_to_copy.name, self.streams(config))) | ||
if len(streams_with_same_name) == 1: | ||
return streams_with_same_name[0] | ||
raise ValueError(f"Expected one stream with name `{stream_to_copy.name}` but got {len(streams_with_same_name)}") | ||
return _create_new_stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical Issue: _new_stream_instance_factory
returns existing instances instead of creating new ones
The _new_stream_instance_factory
method is intended to generate a factory function that creates a new stream instance per thread, preventing concurrent modifications to shared state. However, the current implementation returns existing stream instances from self.streams(config)
, which may lead to threads sharing the same instance and thus not resolving the concurrency issues.
To fix this issue, the factory function _create_new_stream
should instantiate a new stream instance instead of returning an existing one. This can be achieved by recreating the stream using the stream definition and the component factory.
Consider modifying the _create_new_stream
function as follows:
def _create_new_stream() -> Stream:
- streams_with_same_name = list(filter(lambda stream: stream.name == stream_to_copy.name, self.streams(config)))
- if len(streams_with_same_name) == 1:
- return streams_with_same_name[0]
- raise ValueError(f"Expected one stream with name `{stream_to_copy.name}` but got {len(streams_with_same_name)}")
+ stream_definitions = [stream_def for stream_def in self._source_config.get('streams', []) if stream_def.get('name') == stream_to_copy.name]
+ if len(stream_definitions) == 1:
+ return self._constructor.create_component(
+ model_type=DeclarativeStream,
+ component_definition=stream_definitions[0],
+ config=config,
+ )
+ raise ValueError(f"Expected one stream definition with name `{stream_to_copy.name}` but got {len(stream_definitions)}")
This modification ensures that a new instance of the stream is created each time the factory function is called, thus avoiding shared state between threads.
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to just dismiss this if not applicable - or if out of scope, you can suggest the AI to move to a new issue for later follow-up.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
@maxi297 - I pulled in the latest from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
Line range hint
408-414
: Should we add error handling for stream factory calls?What do you think about adding error handling for potential factory failures? Something like:
def generate(self) -> Iterable[Partition]: for slice_start, slice_end in self._cursor.generate_slices(): stream_slice = StreamSlice(...) + try: + stream = self._stream_factory() + except Exception as e: + raise RuntimeError(f"Failed to create stream instance: {e}") from e yield StreamPartition( - self._stream_factory(), + stream, copy.deepcopy(stream_slice), ... )This would provide clearer error messages if stream instantiation fails. WDYT?
Line range hint
448-455
: Consider reusing the stream slice copy?I notice we're doing a deep copy of the stream slice before passing it to StreamPartition. Since StreamPartition also does a deep copy internally, we might be duplicating effort. What do you think about this optimization:
+ slice_copy = copy.deepcopy(stream_slice) yield StreamPartition( self._stream_factory(), - copy.deepcopy(stream_slice), + slice_copy, self.message_repository, self._sync_mode, self._cursor_field, self._state, self._cursor, )This would prevent double copying of the slice data. WDYT?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(3 hunks)airbyte_cdk/sources/streams/concurrent/adapters.py
(4 hunks)unit_tests/sources/streams/concurrent/test_adapters.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/streams/concurrent/test_adapters.py
🔇 Additional comments (5)
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
9-9
: LGTM! Clean import addition.
The addition of Callable
to the typing imports aligns well with the new stream factory pattern implementation.
Line range hint 393-403
: Great use of the factory pattern for thread safety! 👍
The change from direct stream instance to a factory pattern is a solid improvement for concurrent execution. Each thread getting its own stream instance prevents potential race conditions.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
6-6
: LGTM!
Clean addition of Callable
to typing imports.
232-232
: LGTM!
Good use of the factory pattern in CursorPartitionGenerator
instantiation.
348-358
:
Should we create new stream instances instead of reusing existing ones?
The current implementation returns existing stream instances from self.streams(config)
, which could lead to thread safety issues as multiple threads would share the same stateful components. The docstring indicates we want "one stream per thread", but the implementation doesn't achieve this.
Would you consider creating new stream instances instead? Here's a suggestion that aligns better with the docstring's intent, wdyt?
def _new_stream_instance_factory(self, stream_to_copy: Stream, config: Mapping[str, Any]) -> Callable[[], Stream]:
"""
Some of the declarative components are stateful. Therefore, we create one stream per thread in order to avoid threads updating
the same field for a specific instance.
"""
def _create_new_stream() -> Stream:
- streams_with_same_name = list(filter(lambda stream: stream.name == stream_to_copy.name, self.streams(config)))
- if len(streams_with_same_name) == 1:
- return streams_with_same_name[0]
- raise ValueError(f"Expected one stream with name `{stream_to_copy.name}` but got {len(streams_with_same_name)}")
+ stream_definitions = [
+ stream_def for stream_def in self._source_config.get('streams', [])
+ if stream_def.get('name') == stream_to_copy.name
+ ]
+ if len(stream_definitions) == 1:
+ return self._constructor.create_component(
+ model_type=DeclarativeStream,
+ component_definition=stream_definitions[0],
+ config=config,
+ )
+ raise ValueError(f"Expected one stream definition with name `{stream_to_copy.name}` but got {len(stream_definitions)}")
return _create_new_stream
This approach:
- Uses stream definitions to create fresh instances
- Ensures thread safety by avoiding shared state
- Better aligns with the docstring's intent
Let's verify the stream definition structure:
✅ Verification successful
Verification Successful: Stream definitions align with the expected structure.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if stream definitions contain the expected structure
# Expected: Stream definitions should contain 'name' field and be compatible with DeclarativeStream
rg -A 5 '"streams":\s*\[' .
Length of output: 191835
Implemented as part of ddd299e |
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
CursorPartitionGenerator
to clarify changes related to stream instantiation.Tests