Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrent-cdk): Move the grouping of concurrent and synchronous streams into the read and discover commands instead of when initializing the source #130

Merged

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Dec 5, 2024

Problem

We've seen a number of issues crop up due to the fact that in low-code sources, we instantiate the streams within the source's __init__() method. It also created issues for config migrations because we would instantiate a source's streams using an unmigrated config which might fail validations earlier than originally expected

Solution

This PR moves the logic of grouping and creating streams back into the read() and discover() where errors will properly be surfaced where the used to be prior to moving to the concurrent CDK.

todo :

  • unit tests passing
  • retest locally on asana or other manifest-only source with an invalid config which would not properly surface check failed message

Summary by CodeRabbit

  • New Features

    • Enhanced handling of concurrent and synchronous streams in data processing.
    • Simplified state management in the ConcurrentDeclarativeSource class.
  • Bug Fixes

    • Improved test coverage for JSON and JSONL decoders with larger datasets.
  • Tests

    • Updated tests for ConcurrentDeclarativeSource to align with new stream grouping logic.
    • Increased line count in test responses to better simulate larger datasets.

… and discover commands instead of when initializing the source
@brianjlai brianjlai requested review from maxi297 and pnilan December 5, 2024 06:27
Copy link
Contributor

coderabbitai bot commented Dec 5, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The pull request modifies the ConcurrentDeclarativeSource class by simplifying its state handling and stream management. The constructor now directly assigns state to self._state, removing previous conditional logic. The read method's signature has been updated, and its logic streamlined to utilize grouped streams from _group_streams. Additionally, significant changes were made to the test suites for both the JSON decoders and the ConcurrentDeclarativeSource, enhancing their ability to handle larger datasets and ensuring alignment with the new implementation.

Changes

File Change Summary
airbyte_cdk/sources/declarative/concurrent_declarative_source.py Simplified state handling, removed _concurrent_streams and _synchronous_streams, updated read method signature and logic, enhanced _group_streams method.
unit_tests/sources/declarative/decoders/test_json_decoder.py Increased lines_in_response from 2 to 2_000_000 in large_event_response_fixture for larger dataset simulation.
unit_tests/sources/declarative/test_concurrent_declarative_source.py Added mock initialization method, removed disable_emitting_sequential_state_messages, updated tests to use new stream grouping logic, adjusted method signatures with patch decorators.

Possibly related PRs

Suggested labels

chore, enhancement

Suggested reviewers

  • maxi297
  • aaronsteers

What do you think about these suggestions? wdyt?


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

89-90: Consider removing the state from initialization if it's no longer needed

Since streams are now grouped during the read() method, and the source might not need to store the initial state anymore, perhaps we can remove the state parameter from the __init__ method to simplify the initialization process. Wdyt?

unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)

1457-1462: Consider verifying stream names in assertions

In test_streams_with_stream_state_interpolation_should_be_synchronous, alongside checking the lengths of concurrent_streams and synchronous_streams, perhaps we could assert the specific stream names expected in each group. This would make the test more robust and clear. Wdyt?


1598-1601: Suggest enhancing assertions with stream names

In the test, after asserting the number of concurrent and synchronous streams, maybe we can also assert the names of these streams to ensure they are correctly grouped. This could improve test clarity. Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 3e671b8 and f5da8f5.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
  • unit_tests/sources/declarative/decoders/test_json_decoder.py (1 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (11 hunks)
🔇 Additional comments (15)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)

126-126: Simplify the type hint for the state parameter

Changing the state parameter type hint to Optional[List[AirbyteStateMessage]] streamlines the method signature and improves readability. This seems appropriate.


128-129: Approve the use of _group_streams in read()

The call to self._group_streams(config=config) within the read() method enhances the stream grouping logic. This change looks good.


130-133: Comments improve code clarity

The added comments explain why concurrent stream names are saved before syncing synchronous streams. This helps in understanding the control flow.


158-158: Use of _group_streams in discover() method

Calling self._group_streams(config=config) in the discover() method ensures consistent stream grouping for discovery. This approach seems sound.

unit_tests/sources/declarative/test_concurrent_declarative_source.py (10)

9-9: Ensure all required imports are present

The import statement from unittest.mock import patch is correctly added for mocking purposes in the tests.


651-651: Update tests to use _group_streams method

Using concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) aligns the test with the updated implementation. This change looks appropriate.


708-708: Approve stream grouping in test_create_concurrent_cursor

Grouping streams within the test ensures that the cursors are tested in the context of concurrent and synchronous streams. Good job on updating the test.


710-710: Confirming the use of first concurrent stream

Assigning party_members_stream = concurrent_streams[0] is appropriate for testing the party members stream cursor.


726-726: Accessing the correct concurrent stream

Assigning locations_stream = concurrent_streams[2] ensures that the locations stream is correctly tested.


881-884: ****

This is related to the previous comment about ensuring proper initialization in mocked_init. Please refer to that comment.


976-979: ****

This comment duplicates earlier feedback regarding the mocked_init function. Please see the previous comments for details.


1100-1103: ****

Again, this is a duplicate of the previous comments about mocked_init. Ensure that all necessary initializations are performed.


1227-1230: ****

This comment repeats earlier suggestions about the use of mocked_init. Please review the previous comments.


1289-1292: ****

Same as before, please refer to the earlier comments regarding mocked_init and initialization.

unit_tests/sources/declarative/decoders/test_json_decoder.py (1)

57-57: Confirm performance impact of increasing lines_in_response

You've increased lines_in_response to 2_000_000 to simulate a large dataset for testing memory usage. This is great for stress-testing, but should we verify that this doesn't significantly increase the test execution time beyond acceptable limits? Wdyt?

@brianjlai brianjlai changed the title bug(concurrent-cdk): Move the grouping of concurrent and synchronous streams into the read and discover commands instead of when initializing the source fix(concurrent-cdk): Move the grouping of concurrent and synchronous streams into the read and discover commands instead of when initializing the source Dec 5, 2024
Copy link
Contributor

@pnilan pnilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Tested it with source-sendgrid. Resolved the issue.

@github-actions github-actions bot added the bug Something isn't working label Dec 5, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

133-133: Could we simplify the condition check?

Instead of if len(concurrent_streams) > 0:, we could use if concurrent_streams: for better readability and Pythonic style. Wdyt?

Suggested change:

-if len(concurrent_streams) > 0:
+if concurrent_streams:
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between f5da8f5 and 8351472.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

89-91: Consider removing self._state if it's unnecessary

I noticed the TODO comment about potentially removing state from initialization. Since the streams are now grouped during read(), and if self._state is no longer required elsewhere, could we remove self._state = state to simplify the code? Wdyt?

@brianjlai
Copy link
Contributor Author

https://github.com/airbytehq/airbyte/actions/runs/12188006089/job/34000075439

regression test counts were successful and expected diffs verified.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants