Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(airbyte-cdk): unable to create custom retriever #198

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1560,7 +1560,12 @@ def create_exponential_backoff_strategy(
)

def create_http_requester(
self, model: HttpRequesterModel, decoder: Decoder, config: Config, *, name: str
self,
model: HttpRequesterModel,
config: Config,
decoder: Decoder = JsonDecoder(parameters={}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Help me with my shitty Python knowledge. The signature change does two things here:

  1. Add default value for decoder — this is additive and will fix situations where decoder was not passed in
  2. The order of named arguments changed. Is this important, or did we always pass them named and it's compatible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of named arguments changed. Is this important, or did we always pass them named and it's compatible?

This is important but we assume model_to_component_factory is quite mostly private to the CDK hence I'm fine with this breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add default value for decoder — this is additive and will fix situations where decoder was not passed in

Yes

The order of named arguments changed. Is this important, or did we always pass them named and it's compatible?

Usually (and from what I could find in CDK and airbyte base code), this method is only called by _create_component_from_model, which passes the parameters as keyword args.

image

image

image

*,
name: str,
) -> HttpRequester:
authenticator = (
self._create_component_from_model(
Expand Down Expand Up @@ -1976,7 +1981,7 @@ def create_record_selector(
config: Config,
*,
name: str,
transformations: List[RecordTransformation],
transformations: Optional[List[RecordTransformation]] = None,
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
decoder: Optional[Decoder] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
**kwargs: Any,
Expand Down Expand Up @@ -2008,7 +2013,7 @@ def create_record_selector(
name=name,
config=config,
record_filter=record_filter,
transformations=transformations,
transformations=transformations or [],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: Is it intentional that transformations default to None on argument list of the method, but to [] here? Should we just say transormations: List[RecordTransformation] = [] in the args list, or is it parsed to None explicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lint was very annoyed about using a mutable object as the default, and I thought it was safer to pass [] as the default factory does here for RecordSelector.

schema_normalization=schema_normalization,
parameters=model.parameters or {},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,42 @@ def test_create_custom_schema_loader():
assert isinstance(component, MyCustomSchemaLoader)


class MyCustomRetriever(SimpleRetriever):
pass


def test_create_custom_retriever():
stream_model = {
"type": "DeclarativeStream",
"retriever": {
"type": "CustomRetriever",
"class_name": "unit_tests.sources.declarative.parsers.test_model_to_component_factory.MyCustomRetriever",
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "DpathExtractor",
"field_path": [],
},
"$parameters": {"name": ""},
},
"requester": {
"type": "HttpRequester",
"name": "list",
"url_base": "orange.com",
"path": "/v1/api",
"$parameters": {"name": ""},
},
},
}

stream = factory.create_component(
model_type=DeclarativeStreamModel, component_definition=stream_model, config=input_config
)

assert isinstance(stream, DeclarativeStream)
assert isinstance(stream.retriever, MyCustomRetriever)


@freezegun.freeze_time("2021-01-01 00:00:00")
@pytest.mark.parametrize(
"config, manifest, expected",
Expand Down
Loading