Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
FROM php:7.4-cli

ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8

ARG COMPOSER_FLAGS="--prefer-dist --no-interaction"
ARG DEBIAN_FRONTEND=noninteractive
ENV COMPOSER_ALLOW_SUPERUSER 1
Expand Down Expand Up @@ -103,8 +106,8 @@ RUN set -eux; \
*) echo "unsupported architecture"; exit 1 ;; \
esac; \
for key in $(curl -sL https://raw.githubusercontent.com/nodejs/docker-node/HEAD/keys/node.keys); do \
gpg --batch --keyserver hkps://keys.openpgp.org --recv-keys "$key" || \
gpg --batch --keyserver keyserver.ubuntu.com --recv-keys "$key"; \
{ gpg --batch --keyserver keyserver.ubuntu.com --recv-keys "$key" && gpg --list-keys "$key" > /dev/null; } || \
{ gpg --batch --keyserver hkps://keys.openpgp.org --recv-keys "$key" && gpg --list-keys "$key" > /dev/null; } \
done; \
curl -fsSLO --compressed "https://nodejs.org/dist/$NODE_VERSION/node-$NODE_VERSION-linux-$ARCH.tar.xz"; \
curl -fsSLO --compressed "https://nodejs.org/dist/$NODE_VERSION/SHASUMS256.txt.asc"; \
Expand Down
1 change: 1 addition & 0 deletions python-sync-actions/flake8.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ exclude =
tests,
example
venv
ignore = E203,W503
max-line-length = 120

# F812: list comprehension redefines ...
Expand Down
1 change: 1 addition & 0 deletions python-sync-actions/src/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import os

sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/../src")
45 changes: 23 additions & 22 deletions python-sync-actions/src/actions/curl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def to_dict(self):
"dataType": self.dataType,
"dataField": self.dataField,
"params": self.params,
"headers": self.headers
"headers": self.headers,
}


Expand Down Expand Up @@ -56,7 +56,7 @@ def retrieve_url(curl_command: str) -> str:
raise ValueError("Not a valid cURL command")

# find valid URL
url = ''
url = ""
for t in tokens:
token_string = t
if t.startswith("'") and t.endswith("'"):
Expand All @@ -80,16 +80,16 @@ def normalize_url_in_curl(curl_command: str) -> tuple[str, str]:
Returns: converted command, original url

"""
unsupported_characters = ['{', '}']
unsupported_characters = ["{", "}"]
url = retrieve_url(curl_command)
original_url = None
new_url = url

if any([char in url for char in unsupported_characters]):
original_url = url.split('?')[0]
original_url = url.split("?")[0]

for character in unsupported_characters:
new_url = new_url.replace(character, '_')
new_url = new_url.replace(character, "_")

new_curl_command = curl_command.replace(url, new_url)
return new_curl_command, original_url
Expand Down Expand Up @@ -130,7 +130,7 @@ def parse_curl(curl_command: str) -> dict:
result = json.loads(stdout_str)
# replace the original URL in case it was changed to enable child job detection
if original_url:
result['url'] = original_url
result["url"] = original_url

return result

Expand All @@ -146,7 +146,7 @@ def _get_endpoint_path(base_url: str, url: str) -> str:

"""
if url.startswith(base_url):
return url[len(base_url):]
return url[len(base_url) :]
else:
return url

Expand All @@ -161,10 +161,10 @@ def _get_content_type(headers: dict) -> str:

"""
for key, value in headers.items():
if key.lower() == 'content-type':
if key.lower() == "content-type":
return value

return ''
return ""


def build_job_from_curl(curl_command: str, base_url: str = None, is_child_job: bool = False) -> JobTemplate:
Expand All @@ -181,32 +181,33 @@ def build_job_from_curl(curl_command: str, base_url: str = None, is_child_job: b
parsed_curl = parse_curl(curl_command)

if base_url:
job_template.endpoint = _get_endpoint_path(base_url, parsed_curl['url'])
job_template.endpoint = _get_endpoint_path(base_url, parsed_curl["url"])
else:
job_template.endpoint = parsed_curl['url']
job_template.endpoint = parsed_curl["url"]

parsed_method = parsed_curl['method'].upper()
content_type = _get_content_type(parsed_curl.get('headers', {})).lower()
parsed_method = parsed_curl["method"].upper()
content_type = _get_content_type(parsed_curl.get("headers", {})).lower()
if parsed_method == "POST" and content_type == "application/json":
job_template.params = parsed_curl.get('data', {})
job_template.params = parsed_curl.get("data", {})
job_template.method = "POST"

if parsed_curl.get('queries'):
if parsed_curl.get("queries"):
raise UserException("Query parameters are not supported for POST requests with JSON content type")

elif parsed_method == "POST" and content_type == "application/x-www-form-urlencoded":
job_template.params = parsed_curl.get('data', {})
job_template.params = parsed_curl.get("data", {})
job_template.method = "FORM"
elif parsed_method == "GET":
job_template.params = parsed_curl.get('queries', {})
job_template.params = parsed_curl.get("queries", {})
job_template.method = "GET"
else:
raise UserException(f"Unsupported method {parsed_method}, "
f"only GET, POST with JSON and POST with form data are supported.")
raise UserException(
f"Unsupported method {parsed_method}, only GET, POST with JSON and POST with form data are supported."
)

job_template.method = parsed_curl['method'].upper()
job_template.headers = parsed_curl.get('headers', {})
job_template.dataType = job_template.endpoint.split('/')[-1]
job_template.method = parsed_curl["method"].upper()
job_template.headers = parsed_curl.get("headers", {})
job_template.dataType = job_template.endpoint.split("/")[-1]
job_template.dataField = {"path": ".", "separator": "."}

return job_template
132 changes: 61 additions & 71 deletions python-sync-actions/src/actions/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,21 @@


class HeaderNormalizer(DefaultHeaderNormalizer):

def _normalize_column_name(self, column_name: str) -> str:
# Your implementation here

column_name = self._replace_whitespace(column_name)
column_name = self._replace_forbidden(column_name)
if column_name.startswith('_'):
if column_name.startswith("_"):
column_name = column_name[1:]

return column_name


class StuctureAnalyzer:

def __init__(self):
self.analyzer: Analyzer = Analyzer()
self.header_normalizer = HeaderNormalizer(forbidden_sub='_')
self.header_normalizer = HeaderNormalizer(forbidden_sub="_")

def parse_row(self, row: dict[str, Any]):
current_path = []
Expand All @@ -34,13 +32,14 @@ def parse_row(self, row: dict[str, Any]):
for name, value in row.items():
self.analyzer.analyze_object(current_path, name, value)

def infer_mapping(self,
primary_keys: Optional[list[str]] = None,
parent_pkeys: Optional[list[str]] = None,
user_data_columns: Optional[list[str]] = None,
path_separator: str = '.',
max_level: int = 2
) -> dict:
def infer_mapping(
self,
primary_keys: Optional[list[str]] = None,
parent_pkeys: Optional[list[str]] = None,
user_data_columns: Optional[list[str]] = None,
path_separator: str = ".",
max_level: int = 2,
) -> dict:
"""
Infer first level Generic Extractor mapping from data sample.
Args:
Expand All @@ -53,15 +52,14 @@ def infer_mapping(self,
Returns:

"""
result_mapping = self.__infer_mapping_from_structure_recursive(self.analyzer.node_hierarchy['children'],
primary_keys,
path_separator, max_level)
result_mapping = self.__infer_mapping_from_structure_recursive(
self.analyzer.node_hierarchy["children"], primary_keys, path_separator, max_level
)

if parent_pkeys:
for key in parent_pkeys:
if key in result_mapping:
raise UserException(f"Parent {key} is already in the mapping, "
f"please change the placeholder name")
raise UserException(f"Parent {key} is already in the mapping, please change the placeholder name")
result_mapping[key] = MappingElements.parent_primary_key_column(key)
if user_data_columns:
for key in user_data_columns:
Expand All @@ -82,24 +80,28 @@ def dedupe_values(mapping: dict) -> dict:
simple_mapping = True
col_name = value
if isinstance(value, dict):
col_name = value['mapping']['destination']
col_name = value["mapping"]["destination"]
simple_mapping = False

if col_name in seen.keys():
seen[col_name] += 1
if simple_mapping:
mapping[key] = f"{col_name}_{seen[col_name]}"
else:
mapping[key]['mapping']['destination'] = f"{col_name}_{seen[col_name]}"
mapping[key]["mapping"]["destination"] = f"{col_name}_{seen[col_name]}"
else:
seen[col_name] = 0
return mapping

def __infer_mapping_from_structure_recursive(self, node_hierarchy: dict[str, Any],
primary_keys: Optional[list[str]] = None,
path_separator: str = '.',
max_level: int = 2, current_mapping: dict = None,
current_level: int = 0) -> dict:
def __infer_mapping_from_structure_recursive(
self,
node_hierarchy: dict[str, Any],
primary_keys: Optional[list[str]] = None,
path_separator: str = ".",
max_level: int = 2,
current_mapping: dict = None,
current_level: int = 0,
) -> dict:
"""
Infer first level Generic Extractor mapping from data sample.
Args:
Expand All @@ -115,7 +117,7 @@ def __infer_mapping_from_structure_recursive(self, node_hierarchy: dict[str, Any
current_mapping = {}
for key, value in node_hierarchy.items():
if isinstance(value, dict):
current_node: Node = value['node']
current_node: Node = value["node"]
path_key = path_separator.join(current_node.path)
normalized_header_name = self.header_normalizer._normalize_column_name(current_node.header_name) # noqa
match current_node.data_type:
Expand All @@ -127,10 +129,14 @@ def __infer_mapping_from_structure_recursive(self, node_hierarchy: dict[str, Any

case NodeType.DICT:
if current_level <= max_level:
self.__infer_mapping_from_structure_recursive(value['children'], primary_keys,
path_separator,
max_level, current_mapping,
current_level)
self.__infer_mapping_from_structure_recursive(
value["children"],
primary_keys,
path_separator,
max_level,
current_mapping,
current_level,
)
else:
current_mapping[path_key] = MappingElements.force_type_column(normalized_header_name)
case _:
Expand All @@ -141,9 +147,9 @@ def __infer_mapping_from_structure_recursive(self, node_hierarchy: dict[str, Any
if all(isinstance(item, dict) for item in value):
for idx, item in enumerate(value):
list_key = f"{key}[{idx}]"
self.__infer_mapping_from_structure_recursive({list_key: item}, primary_keys,
path_separator, max_level,
current_mapping, current_level)
self.__infer_mapping_from_structure_recursive(
{list_key: item}, primary_keys, path_separator, max_level, current_mapping, current_level
)
else:
# Handle list of non-dictionary items
current_mapping[key] = MappingElements.force_type_column(key)
Expand All @@ -156,49 +162,29 @@ def __infer_mapping_from_structure_recursive(self, node_hierarchy: dict[str, Any
class MappingElements:
@staticmethod
def primary_key_column(column_name: str) -> dict:
return {
"mapping": {
"destination": column_name,
"primaryKey": True
}
}
return {"mapping": {"destination": column_name, "primaryKey": True}}

@staticmethod
def parent_primary_key_column(column_name: str) -> dict:
return {
"type": "user",
"mapping": {
"destination": column_name,
"primaryKey": True
}
}
return {"type": "user", "mapping": {"destination": column_name, "primaryKey": True}}

@staticmethod
def force_type_column(column_name: str) -> dict:
return {
"type": "column",
"mapping": {
"destination": column_name
},
"forceType": True
}
return {"type": "column", "mapping": {"destination": column_name}, "forceType": True}

@staticmethod
def user_data_column(column_name: str) -> dict:
return {
"type": "user",
"mapping": {
"destination": column_name
}
}


def infer_mapping(data: list[dict],
primary_keys: Optional[list[str]] = None,
parent_pkeys: Optional[list[str]] = None,
user_data_columns: Optional[list[str]] = None,
path_separator: str = '.',
max_level_nest_level: int = 2) -> dict:
return {"type": "user", "mapping": {"destination": column_name}}


def infer_mapping(
data: list[dict],
primary_keys: Optional[list[str]] = None,
parent_pkeys: Optional[list[str]] = None,
user_data_columns: Optional[list[str]] = None,
path_separator: str = ".",
max_level_nest_level: int = 2,
) -> dict:
"""
Infer first level Generic Extractor mapping from data sample.
Args:
Expand All @@ -223,10 +209,13 @@ def infer_mapping(data: list[dict],
for row in data:
analyzer.parse_row(row)

result = analyzer.infer_mapping(primary_keys or [], parent_pkeys or [],
user_data_columns or [],
path_separator=path_separator,
max_level=max_level_nest_level)
result = analyzer.infer_mapping(
primary_keys or [],
parent_pkeys or [],
user_data_columns or [],
path_separator=path_separator,
max_level=max_level_nest_level,
)
return result


Expand All @@ -239,5 +228,6 @@ def get_primary_key_columns(mapping: dict) -> list[str]:
Returns:

"""
return [key for key, value in mapping.items() if
isinstance(value, dict) and value.get('mapping', {}).get('primaryKey')]
return [
key for key, value in mapping.items() if isinstance(value, dict) and value.get("mapping", {}).get("primaryKey")
]
Loading
Loading