-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Async Retriever
change url path for download retriever
#192
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Async Retriever
change url path for download retriever
@@ -189,7 +189,8 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: | |||
for url in self.urls_extractor.extract_records( | |||
self._polling_job_response_by_id[job.api_job_id()] | |||
): | |||
stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={}) | |||
stream_slice = job.job_parameters() | |||
stream_slice.extra_fields.update({"url": url}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have mixed feelings about having to make the stream_slice's extra_fields
mutable. Even though in this case it might be fine, I like that in our code the contract is that a given slice's underlying mappings won't change. I think we should try to keep the interface as is where extra_fields
is just a Mapping
One way we could do this is by constructing a new slice by copying things into a new slice
job_slice = job.job_parameters()
StreamSlice(
partition=job_slice.partition,
cursor_slice=job_slice.cursor_slice,
extra_fields={**job_slice.extra_fields, "url": url},
)
And the other question, why was the URL being in the partition creating issues for the state_manager, I'm not fully against it being an extra field, but I feel like the partition is the natural place for a usability standpoint since that is what we're partitioning on. So I'd want to understand why we need to move it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the interface as is where extra_fields is just a Mapping
agreed, reverted to Mapping
And the other question, why was the URL being in the partition creating issues for the state_manager, I'm not fully against it being an extra field, but I feel like the partition is the natural place for a usability standpoint since that is what we're partitioning on. So I'd want to understand why we need to move it
Long story short: url
field in partition changes the way we store (and serialize) StreamSlice
keys.
- in amazon Ads we use incremental sync, so we have the following slice that should be passed to the
download_retriever
( I redefined it at line 198 for testing purposes)
- during the read operation, we try to
observe
(L153), but theself._cursor_per_partition
has only initial partition{"parent_slice":{},"profileId":1}
- and therefore cannot be updated (KeyError) by
self._to_partition_key(stream_slice.partition)
That is why I decided to move url field to extra_fields => to exclude it from serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay thank you for the clear explanation. It's a shame that we're left with using the extra_fields
as a means of access to avoid it being included. Not something that I am completely opposed to so from a code perspective things look good to me so I'll approve
But because @maxi297 implemented most of the original Async job work, I think we should just get his eyes on the final design as well. Might be worth waiting the extra day for him when he is back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the very clear explanation @artem1205!
Yes, this is indeed a problem. I'm not opposed to the solution but I'm wondering if we could do something more explicit and aligned with how stream_slices work. The issue is that the URL doesn't have anything to do with the stream slice and this seems more like a hack that could have unintended issues later on. We wanted to fix this before putting this in the Connector Builder. I'm wondering if we have started tackling this as part of this project @bazarnov
So let me add some background: The reason we use the stream slice in that part of the code is because we didn't have any other easy mean of doing interpolation on variables like url
. The same is true for stream_slice['create_job_response']
. I don't remember where we allow for interpolation on some variables, but I know it is restricted to few variables like config
, parameters
, stream_state
, stream_slice
, next_page_token
, stream_interval
and stream_partition
. If we could add other variables to that, I think it would be more explicit to have HttpRetriever.read_records
have the ability to take variables to be interpolated. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to go forward with the current solution, I'm fine with this but know that it'll probably change once we have a better way to interpolate
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Async Retriever
change url path for download retrieverAsync Retriever
change url path for download retriever
📝 WalkthroughWalkthroughThis pull request introduces modifications across multiple files in the Airbyte CDK, focusing on enhancing record processing and stream slice handling. The changes primarily involve updating the Changes
Sequence DiagramsequenceDiagram
participant Factory as ModelToComponentFactory
participant Retriever as AsyncRetriever
participant Selector as RecordSelector
Factory->>Retriever: create_async_retriever()
Retriever->>Selector: Initialize with transformations
Selector-->>Retriever: Configured selector
Possibly related PRs
Suggested labels
Suggested reviewers
Hey there! 👋 I noticed these changes look quite interesting. Would you like me to elaborate on any specific aspect of the modifications? Wdyt about the sequence of changes? 🤔 Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/types.py (1)
156-157
: Should we consider extending the docstring for clarity?
Currently, the method is straightforward, but it might help future readers if we explained that aStreamSlice
is considered truthy whenever its main or extra fields are non-empty. wdyt?unit_tests/sources/declarative/requesters/test_http_job_repository.py (1)
87-87
: Could we safeguard against missing 'url' inextra_fields
?
When referencing{{stream_slice.extra_fields['url']}}
, a KeyError could occur if'url'
is absent. Would it make sense to provide a default or fail gracefully? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/http_job_repository.py
(1 hunks)airbyte_cdk/sources/types.py
(1 hunks)unit_tests/sources/declarative/requesters/test_http_job_repository.py
(1 hunks)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)
192-197
: Any concerns about overwriting an existing 'url' inextra_fields
?
When mergingextra_fields
with{"url": url}
, the new key unconditionally overrides. If theextra_fields
dictionary already contained aurl
entry, it would be lost. Is this desired, or should we handle it differently? wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2257-2257
: Are transformations intended for all download operations?
We are passingtransformations=transformations
into theRecordSelector
for the download retriever. Should we allow users to configure a distinct transformations list exclusively for downloads? wdyt?
📝 WalkthroughWalkthroughThis pull request introduces modifications across multiple files in the Airbyte CDK, focusing on enhancing record processing and stream slice handling. The changes primarily affect the Changes
Sequence DiagramsequenceDiagram
participant Factory as ModelToComponentFactory
participant Selector as RecordSelector
participant Job as AsyncHttpJobRepository
participant Slice as StreamSlice
Factory->>Selector: Create with transformations
Job->>Slice: Construct with job parameters
Slice-->>Job: Provide context for record fetching
Possibly related PRs
Suggested labels
Suggested reviewers
Hey there! 👋 I noticed these changes look quite interesting. Would you like me to elaborate on any specific aspect of the modifications? The transformation handling and stream slice updates seem particularly intriguing. Wdyt? 🤔 Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/types.py (1)
155-157
: Would you consider expanding the docstring to explain the new boolean evaluation?Currently,
__bool__
returns true if either the main slice orextra_fields
is non-empty. It might be helpful to clarify this in the docstring or comment, so future maintainers understand why it’s deemed “truthy” if either portion is present. wdyt?airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)
192-197
: Would you consider verifying whether the “url” key already exists injob_slice.extra_fields
?Merging
"url"
intoextra_fields
might accidentally overwrite a name collision. Checking this in advance or documenting the assumption could prevent unexpected behavior. wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2257-2257
: Any interest in logging or clarifying transformations usage here?We’re now passing
transformations
to theSimpleRetriever
’sRecordSelector
. It might be good to describe in code comments how these transformations are applied when reading job results. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/http_job_repository.py
(1 hunks)airbyte_cdk/sources/types.py
(1 hunks)unit_tests/sources/declarative/requesters/test_http_job_repository.py
(1 hunks)
🔇 Additional comments (1)
unit_tests/sources/declarative/requesters/test_http_job_repository.py (1)
87-87
: Could we handle missing “url” more gracefully?In
path="{{stream_slice.extra_fields['url']}}"
, a KeyError could arise if'url'
is absent fromextra_fields
. Perhaps we could add a default or an assertion? wdyt?
What
url
asextra_field
to ignore it in state managertransformations
to download retrieverCaution
changing url path in
stream_slice
for download retriever is technically a breaking change, but I don't want to bump major version sinceAsyncRetriever
is anExperimentalClass
Reason
see #192 (comment)
Summary by CodeRabbit
Release Notes
New Features
StreamSlice
class with boolean evaluation supportImprovements
The changes introduce more dynamic and flexible data processing capabilities within the Airbyte CDK, allowing for more nuanced record transformations and stream handling.