Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Low-Code Concurrent CDK): Make SimpleRetriever thread-safe so that different partitions can share the same SimpleRetriever #185

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Dec 21, 2024

What

Right now, because the SimpleRetriever has an internally managed and modified state, we cannot have multiple partitions running at the same time share the same SimpleRetriever. Otherwise we might run into data loss because the state of the SimpleRetriever (things like page number, etc) are modified by different partitions. We've seen this problem arise in some connections (link issue) and we have temporarily solved this by having every Partition instantiate it's own SimpleRetriever.

This however is very inefficient for a couple reasons like needing to perform auth for every partition (link issue). And this is a hard blocker on AsyncRetriever which must be shared across partitions in order to manage the shared job repository.

This PR replaces an internal state for SimpleRetriever and all of its dependencies which are manged via a token field and makes all methods stateless by relying on passing parameterized values for the next_page_token instead.

How

This PR basically makes the following changes for thread safety: to the underlying implementations of the DefaultPaginator and all the pagination strategies so that they no longer have internal fields and instead pass required information via parameters to be stateless

  • The SimpleRetriever class had internal fields that were removed and replaced by local method variables like last_record, and last_page_size. These are passed to and from methods local to the initial read_records() invocation
  • The Paginator/DefaultPaginator interface has removed internal state so that the previous token is passed via parameter to determine the next token to supply to the retriever. Methods are now receiving parameters like next_page_token because we need the prior value to get the new value in a stateless manner. This will impact custom components that will no longer adhere to the original interface. More on that below.
  • PaginationStrategy was also affected by methods needing to receive additional parameters
  • All forms of resetting PaginationStrategy and DefaultPaginator components is removed because there is no longer an internal state field

I also added in the changes to allow AsyncRetriever to instantiated within the concurrent framework because I want to test some of this functionality with sendgrid which needs these changes.

Other notes

  • This is technically considered a breaking change because it changes the Paginator and PaginationStrategy method interfaces to now include additional parameters and our internal code now passes extra parameters
  • The impact should be that only connectors that use custom components will be affected, and we expect them to fail loudly
  • There is some debate about whether this should be a minor or major version bump:
    • [Connectors with no custom components] - Not breaking
    • [Connectors with custom components] - Independently versioned and will fail loudly when bumping to latest CDK
      • source-freshdesk: custom pagination strategy
      • source-linkedin-ads: custom retriever
      • source-mixpanel: custom pagination strategy
      • source-monday: custom pagination strategy
      • source-zendesk-chat: custom pagination strategy
    • [manifest-only with custom components] - These carry the risk of getting upgraded and now failing. They need to be fixed post-release
      • source-the-guardian-api: custom pagination strategy
    • [Customer created connectors] - The builder doesn't currently support custom components until the in progress project ships. But for the moment we're operating that there are none. Brian to confirm w/ marketplace.

Because there really seems to only be one very low usage community connector that would be affected by the breaking change, I am proposing we make this a minor change so that all of our connectors don't get pinned behind a breaking change and can continue to receive our future changes that will improve adoption of concurrency. I'm happy to pick up this discussion in the PR if there's opposition

Summary by CodeRabbit

  • New Features

    • Enhanced concurrency handling for streaming data, supporting both synchronous and asynchronous stream types.
    • Improved pagination token management across various paginator classes.
  • Bug Fixes

    • Updated error handling to prevent potential deadlocks in concurrent processing.
    • Adjusted pagination logic to ensure correct token retrieval and state management.
  • Tests

    • Expanded test coverage for pagination strategies and concurrent sources.
    • Updated existing tests to align with new method signatures and behaviors.
    • Added new tests for SimpleRetriever to validate pagination scenarios.

for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is tricky. Before, used to manage _last_record and _last_page_size as fields on the SimpleRetriever and we updated it from within _parse_response().

However, to make this stateless, instead of trying to yield records and at the end returning a value which was messy, I moved all the assignment up to this level. In my testing, this seemed to be the equivalent functionality of setting last_response first then updating last_page_size and last_record as records are emitting. Just want to highlight this change a bit since it does subtly change the flow a bit

@@ -24,27 +24,32 @@ class Paginator(ABC, RequestOptionsProvider):
"""

@abstractmethod
def reset(self, reset_value: Optional[Any] = None) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reseting can be deprecated now that these classes are stateless. We do however need to add the ability to get the initial token value for different types of pagination strategies

@@ -518,7 +525,7 @@ def _parse_records(
stream_state: Mapping[str, Any],
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice],
) -> Iterable[StreamData]:
) -> Iterable[Record]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this wasn't typed correctly because _parse_response returns an iterable of records. This is a more accurate type

@brianjlai brianjlai marked this pull request as ready for review December 27, 2024 05:40
@brianjlai brianjlai changed the title refactor: Make SimpleRetriever thread-safe so that different partitions can share the same SimpleRetriever refactor(low code concurrent cdk): Make SimpleRetriever thread-safe so that different partitions can share the same SimpleRetriever Dec 27, 2024
@brianjlai brianjlai changed the title refactor(low code concurrent cdk): Make SimpleRetriever thread-safe so that different partitions can share the same SimpleRetriever refactor(Low-Code Concurrent CDK): Make SimpleRetriever thread-safe so that different partitions can share the same SimpleRetriever Dec 27, 2024
Copy link
Contributor

coderabbitai bot commented Dec 27, 2024

📝 Walkthrough

Walkthrough

The pull request introduces comprehensive modifications to pagination and retrieval mechanisms across multiple files in the Airbyte CDK. The changes primarily focus on enhancing the flexibility of pagination strategies, stream handling, and concurrency management. Key modifications include updating method signatures to support more dynamic token handling, removing reset methods, and refining how streams are grouped and processed in concurrent scenarios.

Changes

File Change Summary
airbyte_cdk/sources/declarative/concurrent_declarative_source.py Removal of _retriever_factory method; updated _group_streams for AsyncRetriever support.
airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py Added get_initial_token method; updated signatures for pagination methods to include next_page_token.
airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py Updated path and next_page_token methods to include next_page_token parameter; added get_initial_token.
airbyte_cdk/sources/declarative/requesters/paginators/paginator.py Removed reset method; added get_initial_token and updated method signatures for pagination.
airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py Removed _initial_cursor and reset method; updated next_page_token signature.
airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py Removed _offset attribute; updated next_page_token and reset methods.
airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py Removed _page attribute; updated pagination logic in next_page_token.
airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py Updated next_page_token signature; removed reset method.
airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py Updated next_page_token signature; removed reset method.
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Updated methods for pagination handling and record parsing; refined return types.
unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py Updated tests to reflect changes in last_record structure; removed test_reset_with_initial_token.
unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py Updated tests to include next_page_token parameter in method calls.
unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py Updated next_page_token call to include additional parameter.
unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py Updated test parameters and assertions to reflect changes in paginator logic; removed test_offset_increment_reset.
unit_tests/sources/declarative/requesters/paginators/test_page_increment.py Updated test parameters to include last_page_token_value; removed reset tests.
unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py Updated tests for next_page_token method; removed reset tests.
unit_tests/sources/declarative/retrievers/test_simple_retriever.py Added tests for pagination strategies; updated assertions for pagination handling.
unit_tests/sources/declarative/test_concurrent_declarative_source.py Updated tests for stream grouping logic; modified assertions for stream counts.
unit_tests/sources/declarative/test_manifest_declarative_source.py Updated mock calls in tests to reflect changes in method signatures.
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py Updated constructor to use direct Retriever instance instead of factory.
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py Removed setUp method; localized mock instantiation within tests.

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • maxi297
  • tolik0

What do you think about these modifications? Do they align with your expectations for improving the pagination and retrieval mechanisms? 😊


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (48)
airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py (6)

167-167: Headers retrieval
This approach mirrors request params. WDYT about merging similar logic into a shared helper function?


176-176: Body data retrieval
Sticking to the same pattern is consistent. Would you like to store the body data in a local variable for debugging, wdyt?


187-199: Token injection
We inject the page token if it exists. Looks correct. Maybe log a warning if token is unexpectedly None, wdyt?


231-233: Reset page count
get_initial_token() re-initializes _page_count. Might it be clearer if _page_count was renamed to _remaining_pages or similar, wdyt?


250-251: Path delegation
We forward path calls as well. WDYT about caching the path for debug or logging?


Line range hint 287-291: Fallback retriever factory
We switch to _retriever_factory() if it's not async. Maybe we can handle concurrency-unsafe retrievers here more directly, wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)

197-201: Retriever type check
We check if the retriever is SimpleRetriever or AsyncRetriever. Perhaps unify both under a common interface to avoid conditionals, wdyt?


283-286: Factory method
This closure returns a shared async_retriever. Do we want to add doc comments clarifying concurrency usage, wdyt?


287-291: Fallback retriever factory
We delegate to _retriever_factory() if not async. Maybe unify the logic so a single method picks the right path automatically, wdyt?


296-296: Wrapping partition generator
This code sets up the DeclarativePartitionFactory. A docstring note about concurrency constraints might help, wdyt?

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (9)

255-255: Record selection
We directly yield from record_selector.select_records. This is nice and streamlined. Should we log the count of records, wdyt?


330-330: _read_pages method
You're using records_generator_fn as a callback. Would you define a type alias for clarity, wdyt?


335-338: Initial token retrieval
We wrap initial_token into a dict if it's not None. Using a custom typed structure might be clearer, wdyt?


341-347: Tracking last record
We store last_page_size and last_record to pass along. Maybe add a debug log here, wdyt?


369-369: _read_single_page
This approach focuses on a single page at a time. Would you unify it with _read_pages to reduce duplication, wdyt?


390-390: Marking completion
We set {FULL_REFRESH_SYNC_COMPLETE_KEY: True} to mark the end of sync. Any interest in logging completion, wdyt?


392-400: Next page token fallback
If _next_page_token is None, we default to a "sync complete" token. Would short-circuiting if last_page_size is 0 make sense, wdyt?


528-528: Accurate method signature
Updating _parse_records clarifies types. WDYT about adding a brief docstring?


572-572: Log formatter usage
We pass a custom log formatter for test reads, which is helpful. Would you also log the response status code, wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py (1)

29-39: Extended next_page_token signature
We added last_page_token_value to handle stateless pagination. Could we enhance the docstring to clarify how this differs from the token in response, wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py (1)

65-70: next_page_token returning an empty dictionary.
Currently, this returns an empty dictionary rather than None. Would returning None for a no-pagination scenario be more intuitive, or do you prefer an empty dictionary for consistency with other pagination strategies? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/paginator.py (2)

34-38: next_page_token signature updated.
Adding last_page_token_value parameter gives more control over pagination status in concurrent settings. Is it worth explicitly documenting concurrency patterns here, or is that best kept in a separate concurrency guide? wdyt?


46-46: Expanded docstring to mention last_page_token_value.
Thanks for making this param clearer. Might we also add an example usage snippet, or is that unnecessary here? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py (1)

53-53: Comment about reverse order.
Your reasoning here is quite interesting. Would it help to clarify why many APIs return data in descending order, so other developers can better understand this assumption? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py (2)

45-45: Returning start_from_page as the initial_token.
This is a clear way to define the first page. Would you consider logging or validating that start_from_page is a positive integer, just to catch configuration errors early? wdyt?


65-65: Validation for integer page tokens.
Throwing a ValueError here is good defensive coding. We might consider logging the contents of last_page_token_value for easier debugging, wdyt?

unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py (1)

48-54: Consider documenting the new parameter in a docstring.
Defining the purpose and expected types of last_page_token_value could help maintain clarity for future contributors. wdyt?

unit_tests/sources/declarative/requesters/paginators/test_page_increment.py (1)

17-39: Expanded parameter list
Including last_page_token_value allows the paginator to be stateless. This is a beneficial change for concurrency. Did we consider adding a scenario where last_page_token_value is present but doesn't match the expected next page token? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py (2)

59-64: Clear documentation for initial_token.
Marking None as the default clarifies that the strategy starts based on external references (state or the first response). Maybe highlight in a class docstring that there is no explicit “cursor reset”? wdyt?


67-71: Updated signature with last_page_token_value.
This keeps the interface consistent with the other pagination strategies. Any thoughts on adding a small log message if last_page_token_value is ignored in this strategy? wdyt?

unit_tests/sources/declarative/test_concurrent_declarative_source.py (3)

654-655: Double-check concurrency count?
Lines 654–655 comment on the new async job stream, and we ensure the test for six concurrent streams. Would you like to also verify each stream type to confirm that all newly added streams are indeed part of this count? wdyt?


662-662: Clean enumeration of concurrent streams
You’re unpacking each stream to a unique variable, which helps with clarity. Perhaps confirm in a follow-up test that each element is indeed configured as expected (e.g., correct path, paginator, etc.)? wdyt?


678-678: Ensure synchronous streams are declared
You assert there is exactly one synchronous stream. Would you like to add a short docstring indicating why this particular set of streams runs synchronously, for future clarity? wdyt?

unit_tests/sources/declarative/test_manifest_declarative_source.py (2)

1368-1368: Call signature consistency
The call signature for _fetch_next_page includes {}, {}, and None. Would you prefer stating parameters with explicit naming for clarity in future test maintenance? wdyt?


1538-1545: Refining multi-page calls
Here, we see multiple calls to _fetch_next_page({…}) with next_page_token parameters across lines 1538–1545. Would you be open to verifying that each new call is passing along the correct updated token from the prior response, especially if multiple tokens are chained? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py (2)

70-74: Great job passing in last_page_token_value.
This parameterization is a clean, stateless approach for handling page tokens. Do you think a more descriptive name (e.g., previous_offset) would enhance clarity, wdyt?


84-94: Validating the page token is an excellent safeguard.
Raising a ValueError for non-integer tokens will prevent subtle pagination bugs. Have you considered using a custom exception class to allow further debugging or logging, wdyt?

unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py (1)

211-216: Explicitly passing None for next_page_token is clear.
This makes it obvious we're starting from the initial page. Would you consider adding a docstring clarifying that passing None indicates a “fresh start,” wdyt?

unit_tests/sources/declarative/retrievers/test_simple_retriever.py (10)

6-7: Nice usage of partial and type hints.
They improve code readability. Would you like to also consider a type alias for Mapping[str, Any] for further clarity, wdyt?


29-32: Imports for CursorPaginationStrategy and PageIncrement look good.
They align with the new stateless approach. Any thoughts on grouping these pagination strategies under a single test suite for maintainability, wdyt?


40-40: TypeTransformer usage
It's nice to see better transformations for result data. Would you consider adding tests specifically for transformation logic, wdyt?


62-62: Defaulting initial token to None is consistent.
This ensures that calling code clearly manages the start state. Have you considered a sentinel value instead of None for additional clarity, wdyt?


66-66: Returning empty headers by default
It’s straightforward. Might it be useful to log if there are no headers, or do you prefer silent defaults, wdyt?


118-121: Testing _next_page_token with these parameters
This ensures coverage for concurrency scenarios. Would you add a scenario for multi-page transitions, wdyt?


394-394: paginator.get_initial_token is mocked out
Skipping calls indicates you’re ensuring no resets happen if the stream is complete. Maybe a docstring clarifies this logic, wdyt?


668-668: Implementing _paginator_path override
Nice approach to unify path logic from both the requester and paginator. Have you considered a fallback or default path scenario, wdyt?


848-850: New tests for logging
You confirm that your custom log formatter is invoked. A quick question: do you see a need for different logging behaviors based on environment (dev vs. prod), wdyt?


942-1057: test_retriever_is_stateless
Verifying no internal state is definitely crucial for concurrency. Great test approach! Could you expand coverage to confirm correct behavior across different page size inputs, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2671c24 and c704a22.

📒 Files selected for processing (19)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py (7 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py (2 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/paginator.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py (2 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py (2 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (10 hunks)
  • unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py (2 hunks)
  • unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py (3 hunks)
  • unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py (1 hunks)
  • unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py (1 hunks)
  • unit_tests/sources/declarative/requesters/paginators/test_page_increment.py (1 hunks)
  • unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py (2 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (10 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (3 hunks)
  • unit_tests/sources/declarative/test_manifest_declarative_source.py (4 hunks)
🔇 Additional comments (51)
airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py (9)

115-123: Use caution with the new get_initial_token approach
We've introduced get_initial_token() returning self.pagination_strategy.initial_token. This is consistent with the new stateless approach. Would you consider clarifying the docstring about checkpointing, wdyt?


126-139: Focus on next_page_token usage
The parameter last_page_token_value is a nice addition to ensure continuity between requests. Maybe we could handle the scenario if there's a mismatch between last_page_token_value and response, wdyt?


143-147: Path rewriting logic
Replacing the base URL with an empty string is clever for path extraction. If the URL doesn't contain the base substring, do we handle it gracefully, wdyt?


158-158: Good use of _get_request_options
This usage unifies request parameter handling. WDYT about logging if we skip certain params?


185-185: JSON body retrieval
Following the same approach as the other request options. Looks great!


236-240: Decorator next_page_token
We skip pagination after reaching the page limit. Do you want to log how many pages were actually read, wdyt?


246-248: Delegation
We delegate the next_page_token to the decorated paginator. That seems convenient.


Line range hint 277-286: AsyncRetriever approach
We share the async_retriever across partitions. Should we add a test for concurrency safety, wdyt?


Line range hint 296-296: Partition generator
We pass the retriever factory through. Everything looks consistent.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

221-225: Async job stream detection
We detect AsyncRetriever by type. Would you consider validating that the config indeed supports async flows, wdyt?

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (8)

9-9: Import statement update
We extend typing usage to include more types. Maybe consider forward refs if needed, wdyt?


236-242: Paginator path usage
We pass the next_page_token to _paginator.path(), ensuring the path can reflect the correct token. Maybe add a fallback if _paginator.path() is None, wdyt?


273-279: Enhanced next_page_token parameters
Including last_page_size, last_record, and last_page_token_value makes pagination more robust. Would you handle partial pages differently, wdyt?


287-292: Delegate to the pagination strategy
We pass all needed params to pagination_strategy.next_page_token. WDYT about logging the token for debug?


352-360: Deriving next page token
We pass the old token as last_page_token_value. The approach looks consistent.


373-379: Initial token from state
We try stream_state.get("next_page_token") first, or call get_initial_token(). What if the stored token is invalid, wdyt?


380-387: Single-page record loop
The logic to track last_record remains symmetrical with multi-page usage. Nicely done!


435-437: Full refresh early exit
If the prior job was successful, we skip new fetching. This is an efficient optimization.

unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py (1)

12-12: Additional argument
We pass None as the fourth parameter to next_page_token. That aligns with the updated signature. All good, wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py (1)

7-7: Typing improvements
We import Mapping for better type clarity. Would you consider using collections.abc.Mapping for Python 3.9+, wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py (2)

22-22: Method signature updated to accept a page token.
It looks like this method now aligns with the new interface that requires a next_page_token. Since this class never paginates, returning None seems perfectly logical. Do you agree, or do you think returning an empty string might be clearer for some APIs? wdyt?


61-62: Initial token returning None.
This clearly communicates that there is no established starting token. Seems consistent with the “no pagination” concept. Any concerns about potential confusion for newcomers? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/paginator.py (2)

27-29: New get_initial_token method.
This method clarifies how to retrieve a starting token for paginators. It’s a welcome addition that keeps classes stateless. Do you think we need more documentation examples for typical usage? wdyt?


52-52: path now uses next_page_token.
Providing the next_page_token to the path method is a strong step toward full statelessness. Do you see any value in also accepting other optional context values here, or is that beyond scope? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py (2)

47-51: next_page_token signature supports last_page_token_value.
This addition looks consistent with the broader changes. Might it be beneficial to log or validate last_page_token_value inside this decorator if it doesn’t meet certain conditions? wdyt?


57-59: Delegate’s next_page_token call.
Forwarding the parameters to _delegate is straightforward. Any thoughts on adding specialized post-processing before returning? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py (3)

49-53: next_page_token signature now includes last_page_token_value.
This aligns well with the concurrency-friendly approach. Are you considering adding debug logs if last_page_token_value is unexpectedly None? wdyt?


58-63: Handling None page token.
Great approach assuming the first request was already made. However, do you foresee any case where None might appear mid-stream in edge scenarios (e.g., errors)? Would a fallback or error help with diagnostics? wdyt?


68-68: Incrementing the page token.
Simply returning last_page_token_value + 1 is very straightforward. Is there any edge case where the page token could approach a maximum integer limit, or do you assume that’s improbable for typical usage? wdyt?

unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py (3)

17-44: Nice parameterization for last_page_token_value.
It looks consistent with the new signature in the OffsetIncrement class. Adding this parameter keeps the tests aligned with the new stateless approach. Just to be sure, do we need an additional test to confirm that a None value for last_page_token_value behaves correctly? wdyt?


64-66: Good use of the new signature
The call to next_page_token with last_page_token_value shows that the code is consistent with the recently introduced parameter.


69-73: Verifying statelessness
The repeated invocation of next_page_token helps confirm that no internal state depends on a prior call’s side effects. This is a valuable test. Great work!

unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py (2)

89-91: Ensuring concurrency safety
The updated call to next_page_token with last_page_token_value is in line with the new pattern. Do we want to add a separate test explicitly covering concurrency scenarios? wdyt?
[approve]


105-106: Consistent parameter usage
The additional None parameter here aligns with the refactored interface. Maintaining consistency across these test calls strengthens the overall verification. Nicely done!

unit_tests/sources/declarative/requesters/paginators/test_page_increment.py (4)

44-50: Straightforward function signature
Declaring these parameters explicitly clarifies the test logic. This approach of passing everything in makes each test scenario very transparent.


55-55: Verifying start_from_page
Asserting against paginator_strategy.start_from_page instead of a private attribute is more robust and future-proof. Good practice!


63-65: Checking next_page_token
Using a real HTTP response ensures we’re testing the logic with actual request/response flows. This is a strong setup.


68-72: Confirming repeated calls remain consistent
It’s great that we verify calling next_page_token multiple times yields the same token, preventing internal state issues.

unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)

674-675: Validate concurrency configuration
Asserting the type and name ensures the stream is correctly identified as async. Would you consider adding a check that ensures the async_job_stream is truly non-blocking or concurrency-ready, if relevant? wdyt?


1461-1464: Confirm concurrency vs synchronous grouping
These comments and assertions verify that certain streams are concurrent vs. synchronous. Would you like to add a test scenario ensuring the boundary condition (e.g., if a stream gets incremental + partitioning, it moves to synchronous) is also tested? wdyt?

unit_tests/sources/declarative/test_manifest_declarative_source.py (2)

1281-1281: Confirm _fetch_next_page calls
You’re calling _fetch_next_page({}, {}, None) once. Do you anticipate a scenario where extra parameters (like next_page_token) might appear even for no-pagination tests? wdyt?


1452-1452: Flatten fields test
The test checks a single call to _fetch_next_page({}, {}, None). Is that sufficient to verify all flattened scenarios, or do we need more coverage for multi-page results? wdyt?

airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py (1)

66-66: Removing reliance on _offset is a nice simplification.
By returning 0 for the initial token, you ensure pagination starts at zero when inject_on_first_request is True. This is straightforward and improves clarity, wdyt?

unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py (1)

15-15: Switching last_record to a Record instance is a solid improvement.
It helps ensure consistent typing and usage. Do you plan to adopt this Record pattern in other pagination tests for consistency, wdyt?

Also applies to: 83-83

unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py (2)

284-285: Using paginator.next_page_token(...) with None ensures correct interpolation.
This is consistent with your stateless design. Do you foresee any edge cases where None might conflict with a default token, wdyt?


335-338: Retrieving the initial token from paginator is well-structured.
It clearly separates initialization logic from normal pagination. Perhaps re-check whether you need more test coverage for different initial tokens, wdyt?

unit_tests/sources/declarative/retrievers/test_simple_retriever.py (5)

16-17: Using JsonDecoder consistently is a good approach.
It keeps the decoding strategy uniform across tests. Do you plan to parameterize the decoder if you introduce other formats, wdyt?


78-81: Storing last_page_size, last_record, and last_page_token_value
Explicitly passing these around is great for concurrency. Does the test address edge cases like partial pages or empty records, wdyt?


419-419: get_initial_token.assert_not_called()
This is a valid assertion to confirm that no additional pagination logic was triggered. If you expand concurrency scenarios, remember to re-check these call counts, wdyt?


851-890: test_retriever_last_page_size_for_page_increment
This is a good addition for verifying multi-page data retrieval. Do you also want to test if an unexpected smaller page size triggers a stop condition, wdyt?


892-940: test_retriever_last_record_for_page_increment
You’re validating that the new CursorPaginationStrategy handles last_record properly. Would it be worthwhile to test merging logic if multiple records claim to be the “last” record, wdyt?
[approve]

@brianjlai brianjlai changed the title refactor(Low-Code Concurrent CDK): Make SimpleRetriever thread-safe so that different partitions can share the same SimpleRetriever feat(Low-Code Concurrent CDK): Make SimpleRetriever thread-safe so that different partitions can share the same SimpleRetriever Dec 27, 2024
Copy link
Contributor

@artem1205 artem1205 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
tested locally with amazon Ads in branch artem1205/source-amazon-ads-migrate-low-code-async

@brianjlai
Copy link
Contributor Author

See airbytehq/airbyte#49812 (comment) for notes on regression test results on source-sendgrid. Full refresh, incremental, async contacts stream have expected mismatches

@brianjlai
Copy link
Contributor Author

source-the-guardian-api fails acceptance tests, but I also believe this is expected because it uses a custom component CustomPageIncrement which no longer adheres to the interface as noted in the PR description. Testing locally, there is the incoming parameter mismatch, but after fixing the custom component the connector syncs streams as expected.

I will fix this in a follow up PR, but until then this will fail on our CI checks so we need to manually override to merge this

@natikgadzhi
Copy link
Contributor

source-the-guardian-api fails acceptance tests, but I also believe this is expected because it uses a custom component CustomPageIncrement which no longer adheres to the interface as noted in the PR description.

We can also ask @ChristoGrab or community devs to take a glance, but overall, yeah, that should not block you from merging!

What would be helpful is a list of all connectors that have custom paginators, so we can prioritize them for community devs.

Otherwise LGTM, let's get this in!

@brianjlai
Copy link
Contributor Author

brianjlai commented Jan 6, 2025

What would be helpful is a list of all connectors that have custom paginators, so we can prioritize them for community devs.

I already included the list of affected connectors in the description under Other notes.

And I've already fixed and tested source-the-guardian-api in airbytehq/airbyte#50855

@maxi297
Copy link
Contributor

maxi297 commented Jan 7, 2025

To add on the description of this PR:

The issue was first seen when doing some dev releases of source-klaviyo for Tydo on this connection if I'm not mistaken (I couldn't confirm because the logs are enormous and I can't load them easily).

I tried to find the issue in Sentry and couldn't find it (most probably because the sync were enormous and would not finish so the platform would not report on these). However, I've seen an occurrence of this here (link to the sync). This seems very niche as I've only but I wanted to raise awareness on that in case it means my original diagnostic was wrong or that we have another issue. However since it's very scoped in time and affecting multiple connections, I would assume it is only a blurp from klaviyo's API.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked the tests only briefly but the rest looks good. I'm quite happy with this change. I'm just curious about the _retriever_facory which I would like some precision just in case I'm missing something

I really like the cleaning it provides like this and this ❤️

@@ -268,15 +274,26 @@ def _group_streams(
elif (
is_substream_without_incremental or is_without_partition_router_or_cursor
) and hasattr(declarative_stream.retriever, "stream_slicer"):
if is_async_job_stream:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have made the components thread safe, is there a reason to keep the _retriever_factory? If the reason is to limit the blast radius of possible issues, I'm fine with this but I think it would still be nice to have a plan to share the same logic for both async retriever, http retriever and (eventually) custom retriever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. i think when implementing i'd just refactored and left the existing code as is, but I'm not opposed to just yanking out the retriever_factory in it's entirety. more of an oversight. i'll work on taking it out and I think it'll also help make the DeclarativePartitionFactory a bit simpler too since it just needs to take in a retriever and not a factory

return 0 + last_page_size
elif not isinstance(last_page_token_value, int):
raise ValueError(
"The page token for a OffsetIncrement pagination strategy must be an integer"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add the value and which type it was in the error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep good idea. fixing

else:
self._offset += last_page_size
return self._offset
next_page_token_value = last_page_token_value + last_page_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm not sure if this applies to Python as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this applies. mightve been a remnant of something else i wrote, but it should just return the value. nice catch!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)

Line range hint 19-24: Update the docstring to reflect the change from retriever_factory to retriever

The docstring mentions that DeclarativePartitionFactory takes a retriever_factory, but the constructor now accepts a retriever directly. Updating the docstring will prevent confusion. Wdyt?

Apply this diff to update the docstring:

             """
-            The DeclarativePartitionFactory takes a retriever_factory and not a retriever directly. The reason is that our components are not
-            thread safe and classes like `DefaultPaginator` may not work because multiple threads can access and modify a shared field across each other.
-            In order to avoid these problems, we will create one retriever per thread which should make the processing thread-safe.
+            The DeclarativePartitionFactory now takes a retriever directly since components are thread-safe, simplifying the processing.
             """
🧹 Nitpick comments (6)
airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py (2)

70-74: The signature change looks good! Would you consider adding param docs?

The new last_page_token_value parameter enables stateless operation. Perhaps we could add a docstring to explain its purpose and type constraints? wdyt?

Something like:

def next_page_token(
    self,
    response: requests.Response,
    last_page_size: int,
    last_record: Optional[Record],
    last_page_token_value: Optional[Any] = None,
) -> Optional[Any]:
    """
    Calculate the next page token based on the previous token and page size.
    
    Args:
        response: The HTTP response from the last request
        last_page_size: Number of records in the last page
        last_record: The last record from the previous page
        last_page_token_value: The token used for the previous page. Must be an integer if not None.
    
    Returns:
        The next page token or None if no more pages
    
    Raises:
        ValueError: If last_page_token_value is neither None nor an integer
    """

84-93: The logic looks solid! One tiny suggestion for the error message.

The error handling is thorough, and I see you've included the actual value in the error message (nice!). Would you consider also including the actual type to make debugging even easier? wdyt?

Something like:

-                f"Last page token value {last_page_token_value} for OffsetIncrement pagination strategy was not an integer"
+                f"Last page token value {last_page_token_value} (type: {type(last_page_token_value).__name__}) for OffsetIncrement pagination strategy was not an integer"
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

192-197: Consider simplifying conditionals by assigning retriever_type to a variable

Assigning retriever_type to a variable would improve code readability and prevent repetitive dictionary accesses. Wdyt?

Apply this diff to implement the suggestion:

+                    retriever_type = name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
                     if isinstance(declarative_stream, DeclarativeStream) and (
-                        name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] == "SimpleRetriever"
-                        or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] == "AsyncRetriever"
+                        retriever_type == "SimpleRetriever" or retriever_type == "AsyncRetriever"
                     ):

216-220: Remove unused variable is_async_job_stream

The variable is_async_job_stream is assigned but not used later in the code. Removing it would clean up the code. Wdyt?

Apply this diff to remove the unused variable:

-                    is_async_job_stream = (
-                        name_to_stream_mapping[declarative_stream.name].get("retriever", {}).get("type")
-                        == "AsyncRetriever"
-                    )

243-245: Consider rephrasing the comment to avoid using "hack"

Using the term "hack" in code comments might be less professional. Maybe rephrase it to enhance clarity and professionalism. Wdyt?

unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)

31-33: Consider using a setUp method to reduce code duplication

The instantiation of retriever and message_repository is repeated in multiple test methods. Using a setUp method or a helper function could reduce duplication and improve maintainability. Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c704a22 and 02306d5.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (6 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py (2 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py (1 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2 hunks)
  • unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py (7 hunks)
  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py
  • unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py (1)

66-66: LGTM! Clean and simple.

The hardcoded return value of 0 for initial_token when inject_on_first_request is True is a clean way to handle the first page without maintaining internal state.

Comment on lines +243 to +251
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
# still rely on a DatetimeBasedCursor that is properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)
retriever.cursor = None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Clarify the intention of setting retriever.cursor to None after initializing it

Setting retriever.cursor to None immediately after initializing it with set_initial_state might negate the purpose of the initialization. Is this the intended behavior? Wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants