Skip to content

Commit

Permalink
feat: POC for url_requester
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 committed Jan 9, 2025
1 parent 6d5ce67 commit 465cd81
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2972,6 +2972,11 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
url_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
download_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.
anyOf:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,28 +737,38 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
...,
description="Old value to replace.",
examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
examples=[
" ",
"{{ record.id }}",
"{{ config['id'] }}",
"{{ stream_slice['id'] }}",
],
title="Old value",
)
new: str = Field(
...,
description="New value to set.",
examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
examples=[
"_",
"{{ record.id }}",
"{{ config['id'] }}",
"{{ stream_slice['id'] }}",
],
title="New value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down Expand Up @@ -2035,6 +2045,10 @@ class AsyncRetriever(BaseModel):
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.",
)
url_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
None,
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",
)
download_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,16 @@ def create_async_retriever(
if model.delete_requester
else None
)
url_requester = (
self._create_component_from_model(
model=model.url_requester,
decoder=decoder,
config=config,
name=f"job extract_url - {name}",
)
if model.url_requester
else None
)
status_extractor = self._create_component_from_model(
model=model.status_extractor, decoder=decoder, config=config, name=name
)
Expand All @@ -2332,6 +2342,7 @@ def create_async_retriever(
creation_requester=creation_requester,
polling_requester=polling_requester,
download_retriever=download_retriever,
url_requester=url_requester,
abort_requester=abort_requester,
delete_requester=delete_requester,
status_extractor=status_extractor,
Expand Down
22 changes: 19 additions & 3 deletions airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class AsyncHttpJobRepository(AsyncJobRepository):
creation_requester: Requester
polling_requester: Requester
download_retriever: SimpleRetriever
url_requester: Optional[
Requester
] # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
abort_requester: Optional[Requester]
delete_requester: Optional[Requester]
status_extractor: DpathExtractor
Expand Down Expand Up @@ -186,9 +189,7 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
"""

for url in self.urls_extractor.extract_records(
self._polling_job_response_by_id[job.api_job_id()]
):
for url in self._get_download_url(job):
stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={})
for message in self.download_retriever.read_records({}, stream_slice):
if isinstance(message, Record):
Expand Down Expand Up @@ -226,3 +227,18 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
cursor_slice={},
)
return stream_slice

def _get_download_url(self, job: AsyncJob) -> Iterable[str]:
if not self.url_requester:
url_response = self._polling_job_response_by_id[job.api_job_id()]
else:
stream_slice: StreamSlice = StreamSlice(
partition={
"polling_job_response": self._polling_job_response_by_id[job.api_job_id()]
},
cursor_slice={},
)
url_response = self.url_requester.send_request(
stream_slice=stream_slice
)
yield from self.urls_extractor.extract_records(url_response)

0 comments on commit 465cd81

Please sign in to comment.