Skip to content

Commit

Permalink
Fix mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Dec 3, 2024
1 parent 807d23e commit 59c5c7f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,7 @@ class OAuthAuthenticator(BaseModel):
scopes: Optional[List[str]] = Field(
None,
description="List of scopes that should be granted to the access token.",
examples=[
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
],
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
title="Scopes",
)
token_expiry_date: Optional[str] = Field(
Expand Down Expand Up @@ -902,28 +900,24 @@ class OAuthConfigSpecification(BaseModel):
class Config:
extra = Extra.allow

oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
)
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
)
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
)
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
)
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
None,
Expand All @@ -941,9 +935,7 @@ class Config:
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
examples=[
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
],
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
title="OAuth input specification",
)
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1565,25 +1557,21 @@ class Config:
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = (
Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
)
)
name: Optional[str] = Field(
"", description="The stream name.", example=["Users"], title="Name"
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
)
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
primary_key: Optional[PrimaryKey] = Field(
"", description="The primary key of the stream.", title="Primary Key"
)
schema_loader: Optional[
Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
title="Schema Loader",
schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = (
Field(
None,
description="Component used to retrieve the schema for the current stream.",
title="Schema Loader",
)
)
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
Expand Down Expand Up @@ -1812,11 +1800,7 @@ class SimpleRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[
Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
]
],
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
]
] = Field(
[],
Expand Down Expand Up @@ -1885,11 +1869,7 @@ class AsyncRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[
Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
]
],
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
]
] = Field(
[],
Expand Down
36 changes: 23 additions & 13 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ class SchemaTypeIdentifier:
key_pointer: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
type_pointer: Optional[List[Union[InterpolatedString, str]]] = None
types_map: List[TypesPair] = None
types_map: Optional[List[TypesPair]] = None
is_nullable: bool = True

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.schema_pointer = self._update_pointer(self.schema_pointer, parameters)
self.key_pointer = self._update_pointer(self.key_pointer, parameters)
self.schema_pointer = self._update_pointer(self.schema_pointer, parameters) # type: ignore[assignment] # This is reqired field in model
self.key_pointer = self._update_pointer(self.key_pointer, parameters) # type: ignore[assignment] # This is reqired field in model
self.type_pointer = (
self._update_pointer(self.type_pointer, parameters) if self.type_pointer else None
)
Expand Down Expand Up @@ -110,7 +110,8 @@ def get_json_schema(self) -> Mapping[str, Any]:
properties = {}
for retrieved_record in self.retriever.read_records({}):
raw_schema = self._extract_data(
retrieved_record, self.schema_type_identifier.schema_pointer
retrieved_record, # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any]
self.schema_type_identifier.schema_pointer,
)
for property_definition in raw_schema:
key = self._get_key(property_definition, self.schema_type_identifier.key_pointer)
Expand Down Expand Up @@ -167,9 +168,16 @@ def _get_type(
self._get_airbyte_type(mapped_field_type[1]), is_nullable
)
return {"oneOf": [first_type, second_type]}
return self._make_field_nullable(self._get_airbyte_type(mapped_field_type), is_nullable)
elif isinstance(mapped_field_type, str):
return self._make_field_nullable(self._get_airbyte_type(mapped_field_type), is_nullable)
else:
raise ValueError(
f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
)

def _replace_type_if_not_valid(self, field_type: str) -> str:
def _replace_type_if_not_valid(
self, field_type: Union[List[str], str]
) -> Union[List[str], str]:
"""
Replaces a field type if it matches a type mapping in `types_map`.
"""
Expand All @@ -186,11 +194,10 @@ def _make_field_nullable(
"""
Wraps a field type to allow null values if `is_nullable` is True.
"""

updated_field_type = dict(deepcopy(field_type))
if is_nullable:
field_type = deepcopy(field_type)
field_type["type"] = ["null", field_type["type"]]
return field_type
updated_field_type["type"] = ["null", updated_field_type["type"]]
return updated_field_type

@staticmethod
def _get_airbyte_type(field_type: str) -> Mapping[str, Any]:
Expand All @@ -215,11 +222,14 @@ def _extract_data(
if len(extraction_path) == 0:
return body

path = [path.eval(self.config) for path in extraction_path]
path = [
path.eval(self.config) if not isinstance(path, str) else path
for path in extraction_path
]

if "*" in path:
extracted = dpath.values(body, path)
extracted = dpath.values(body, path) # type: ignore # extracted will be a MutableMapping, given input data structure
else:
extracted = dpath.get(body, path, default=default)
extracted = dpath.get(body, path, default=default) # type: ignore # extracted will be a MutableMapping, given input data structure

return extracted

0 comments on commit 59c5c7f

Please sign in to comment.