diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index a9738ff..73b5f07 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-18.04 strategy: matrix: - python-version: ['3.6.x', '3.7.x'] + python-version: ['3.7.x', '3.8.x'] steps: - name: Checkout uses: actions/checkout@v1 @@ -21,7 +21,12 @@ jobs: uses: actions/setup-python@v1 with: python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: pip3 install -r requirements.txt + - name: Run image + uses: abatilo/actions-poetry@v2.0.0 + with: + poetry-version: '1.1.4' + - name: Install library + run: poetry install --no-interaction - name: Run python unit tests - run: make test + run: | + poetry run python -bb -m pytest tests diff --git a/carte_cli/extractor/json_schema_extractor.py b/carte_cli/extractor/json_schema_extractor.py new file mode 100644 index 0000000..719337e --- /dev/null +++ b/carte_cli/extractor/json_schema_extractor.py @@ -0,0 +1,154 @@ +from typing import List, Union, Iterator, Any, Iterable, Dict +from carte_cli.model.carte_table_model import TableMetadata, ColumnMetadata, TableType +from carte_cli.utils.file_io import read_json +from databuilder.extractor.base_extractor import Extractor +from pyhocon import ConfigTree +import boto3 +import json +import copy + + +class JSONSchemaExtractor(Extractor): + + SCHEMA_PATH_KEY = "schema_path" + S3_PROTOCOL = "s3://" + + def __init__( + self, + connection_name: str, + database: str, + schema_path: str, + pivot_column: str = None, + object_expand: Iterable[str] = None, + ): + super().__init__() + self.connection_name = connection_name + self.database = database + self.schema_path = schema_path + self.s3 = boto3.resource("s3") + self.pivot_column = pivot_column + self.object_expand = object_expand + self._extract_iter = iter(self._get_extract_iter()) + + def init(self, conf: ConfigTree) -> None: + self.conf = conf + + def extract(self) -> Any: + try: + return next(self._extract_iter) + except StopIteration: + return None + + def get_scope(self): + return "carte.extractor.json_schema" + + def _get_extract_iter(self) -> Iterator[TableMetadata]: + schema = self._get_schema() + + if "type" not in schema or schema["type"] != "object": + raise ValueError("Schema type has to be 'object'") + + tables = self._process_schema(schema) + for table in tables: + yield table + + def _get_schema(self): + if self.schema_path.startswith(self.S3_PROTOCOL): + schema = self._read_file_from_s3(self.schema_path) + else: + schema = read_json(self.schema_path) + + return schema + + + def _process_schema( + self, schema: dict, column_prefix: str = "" + ) -> Iterable[TableMetadata]: + if self.pivot_column: + if "oneOf" not in schema: + raise ValueError( + "Pivot column provided, but no top-level 'oneOf' in schema" + ) + schemas = {} + for constraint in schema["oneOf"]: + try: + subschema_name = str(constraint["properties"][self.pivot_column][ + "const" + ]) + except KeyError: + raise ValueError("Pivot column inside oneOf should be a const") + + merged_schema = self._deep_merge_dicts(constraint, copy.deepcopy(schema)) + schemas[subschema_name] = merged_schema + + else: + schemas = {self.normalise(schema.get("title", "schema")): schema} + + return [self._schema_to_table(name, schema) for name, schema in schemas.items()] + + def _deep_merge_dicts(self, source: dict, destination: dict): + for key, value in source.items(): + if isinstance(value, dict): + # get node or create one + node = destination.setdefault(key, {}) + self._deep_merge_dicts(value, node) + elif ( + isinstance(value, list) + and key in destination + and isinstance(destination[key], list) + ): + destination[key] += value + else: + destination[key] = value + + return destination + + def _schema_to_table(self, name: str, schema: dict) -> TableMetadata: + required_columns = schema.get("required", []) + columns = {} + for key, val in schema.get("properties").items(): + columns[key] = val + + if self.object_expand and key in self.object_expand: + for subkey, subval in val.get("properties", {}).items(): + columns[f"{key}.{subkey}"] = subval + + mapped_columns = [ + self._process_column(column_name, column_def, required_columns) + for column_name, column_def in columns.items() + ] + + return TableMetadata( + name=name, + database=self.database, + connection=self.connection_name, + location=self.schema_path, + columns=mapped_columns, + table_type=TableType.TABLE, + ) + + def _process_column( + self, column_name: str, column_def: dict, required_columns: List[str] + ): + is_required = column_name in required_columns + column_type = column_def.get("type", "") + ( + " (required)" if is_required else "" + ) + column_values = column_def.get("enum", None) + return ColumnMetadata( + name=column_name, column_type=column_type, values=column_values + ) + + def _read_file_from_s3(self, path): + path_parts_without_protocol = path[len(self.S3_PROTOCOL) :].split("/") + bucket = path_parts_without_protocol[0] + key = "/".join(path_parts_without_protocol[1:]) + content_object = self.s3.Object(bucket, key) + + file_content = content_object.get()["Body"].read().decode("utf-8") + json_content = json.loads(file_content) + + return json_content + + def normalise(self, value: str): + return value.replace("-", "_").replace(" ", "_").lower() diff --git a/carte_cli/model/carte_table_model.py b/carte_cli/model/carte_table_model.py index a34639d..b88da82 100644 --- a/carte_cli/model/carte_table_model.py +++ b/carte_cli/model/carte_table_model.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any, List +from typing import Any, List, Union from databuilder.models.table_metadata import TableMetadata as DatabuilderTableMetadata from databuilder.models.table_metadata import ( ColumnMetadata as DatabuilderColumnMetadata, @@ -28,10 +28,17 @@ class TableType(Enum): class ColumnMetadata: - def __init__(self, name: str, column_type: str, description: str): + def __init__( + self, + name: str, + column_type: str, + description: str = None, + values: Union[None, List[Any]] = None, + ): self.name = name self.column_type = column_type self.description = description + self.values = values @classmethod def from_databuilder(cls, column: DatabuilderColumnMetadata): @@ -43,28 +50,33 @@ def from_databuilder(cls, column: DatabuilderColumnMetadata): else "" ), column_type=column.type, + values=None, ) @classmethod def from_frontmatter(cls, meta_dict): return cls( name=meta_dict["name"], - column_type=meta_dict.get("type"), + column_type=meta_dict.get("type"), description=meta_dict.get("description"), + values=meta_dict.get("values"), ) def to_frontmatter(self): - return { + frontmatter = { "name": self.name, "type": self.column_type, "description": self.description, } + if self.values is not None: + frontmatter["values"] = self.values + + return frontmatter + def __repr__(self) -> str: - return "CarteTableMetadata({!r}, {!r}, {!r})".format( - self.name, - self.column_type, - self.description, + return "CarteTableMetadata({!r}, {!r}, {!r}, {!r})".format( + self.name, self.column_type, self.description, self.values ) @@ -74,10 +86,10 @@ def __init__( name: str, database: str, connection: str, - description: str, location: str, columns: List[ColumnMetadata], table_type: TableType, + description: str = None, ): self.name = name self.connection = connection @@ -156,6 +168,7 @@ def merge_columns(self, existing, preserve_descriptions=True): name=column_name, column_type=column.column_type, description=merged_description, + values=column.values, ) ) return merged_columns diff --git a/carte_cli/utils/config_parser.py b/carte_cli/utils/config_parser.py index 5927107..ce906ff 100644 --- a/carte_cli/utils/config_parser.py +++ b/carte_cli/utils/config_parser.py @@ -1,7 +1,10 @@ +from carte_cli.extractor.json_schema_extractor import JSONSchemaExtractor import io import importlib +from pyhocon.exceptions import ConfigException from ruamel.yaml import YAML from carte_cli.extractor.glue_extractor import GlueExtractor +from carte_cli.utils.file_io import read_yaml yaml = YAML() @@ -35,16 +38,37 @@ def create_postgres_connection(conn_dict): } +def create_json_schema_connection(conn_dict): + config = conn_dict.get(CONFIG_KEY, {}) + try: + connection_name = conn_dict.get("name", "json_schema") + database = config["database"] + schema_path = config["schema_path"] + except KeyError: + raise ConfigException( + "The name, database, and schema_path values are required for JSON Schema connections" + ) + return ( + JSONSchemaExtractor( + connection_name, + database, + schema_path, + pivot_column=config.get("pivot_column"), + object_expand=config.get("object_expand"), + ), + {}, + ) + + CONNECTION_FACTORIES = { "glue": create_glue_connection, "postgresql": create_postgres_connection, + "json_schema": create_json_schema_connection, } def parse_config(filename): - data = _read_file(filename) - - parsed_data = yaml.load(data) + parsed_data = read_yaml(filename) connections = parsed_data.get("connections", []) diff --git a/carte_cli/utils/frontmatter.py b/carte_cli/utils/frontmatter.py index 5219fe1..6821dbb 100644 --- a/carte_cli/utils/frontmatter.py +++ b/carte_cli/utils/frontmatter.py @@ -32,8 +32,8 @@ def dump(filename, metadata, content): buf.write(FRONTMATTER_SEPARATOR) yaml.dump(metadata, buf) buf.write(FRONTMATTER_SEPARATOR) - if content.strip() is not None: - buf.write(content) + if content is not None: + buf.write(content.strip()) with open(filename, "w") as f: print(buf.getvalue(), file=f) diff --git a/pyproject.toml b/pyproject.toml index 65dbef7..85e883a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carte-cli" -version = "0.2.4" +version = "0.2.5" description = "A static site generator for data catalogs" authors = ["Balint Haller "] license = "Apache-2.0" diff --git a/tests/extractor/test_json_schema_extractor.py b/tests/extractor/test_json_schema_extractor.py new file mode 100644 index 0000000..5d82ea5 --- /dev/null +++ b/tests/extractor/test_json_schema_extractor.py @@ -0,0 +1,270 @@ +import boto3 +import unittest +from unittest.mock import patch +from pyhocon import ConfigFactory + +from carte_cli.extractor.json_schema_extractor import JSONSchemaExtractor +from carte_cli.model.carte_table_model import TableMetadata, ColumnMetadata, TableType + + +class TestJSONSchemaExtractor(unittest.TestCase): + def setUp(self) -> None: + self.conf = ConfigFactory.from_dict({}) + + def test_extraction_with_no_columns(self) -> None: + with patch.object(JSONSchemaExtractor, "_get_schema") as mock_get_schema: + mock_get_schema.return_value = { + "type": "object", + "title": "test-schema", + "properties": {}, + } + extractor = JSONSchemaExtractor( + "test-connection", "test-database", "test-schema-path" + ) + extractor.init(self.conf) + + results = extractor.extract() + self.assertEqual(results.connection, "test-connection") + self.assertEqual(results.database, "test-database") + self.assertEqual(results.columns, []) + + def test_extraction_raises_with_no_type(self) -> None: + with patch.object(JSONSchemaExtractor, "_get_schema") as mock_get_schema: + mock_get_schema.return_value = {} + extractor = JSONSchemaExtractor( + "test-connection", "test-database", "test-schema-path" + ) + extractor.init(self.conf) + + with self.assertRaises(ValueError): + results = extractor.extract() + + def test_extraction_with_simple_schema(self) -> None: + with patch.object(JSONSchemaExtractor, "_get_schema") as mock_get_schema: + mock_get_schema.return_value = { + "type": "object", + "title": "Test schema", + "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}, + "required": ["name"], + } + extractor = JSONSchemaExtractor( + "test-connection", "test-database", "test-schema-path" + ) + extractor.init(self.conf) + + results = extractor.extract() + self.assertEqual(results.connection, "test-connection") + self.assertEqual(results.database, "test-database") + self.assertEqual(results.name, "test_schema") + self.assertEqual(results.columns[0].name, "name") + self.assertEqual(results.columns[0].column_type, "string (required)") + self.assertEqual(results.columns[0].description, None) + + self.assertEqual(results.columns[1].name, "age") + self.assertEqual(results.columns[1].column_type, "integer") + self.assertEqual(results.columns[1].description, None) + + results = extractor.extract() + self.assertEqual(results, None) + + def test_raises_with_pivot_and_no_oneof(self) -> None: + with patch.object(JSONSchemaExtractor, "_get_schema") as mock_get_schema: + mock_get_schema.return_value = { + "type": "object", + "title": "test-schema", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"}, + "glasses": {"type": "boolean"}, + }, + "required": ["name"], + } + + extractor = JSONSchemaExtractor( + "test-connection", "test-database", "test-schema-path", "glasses" + ) + extractor.init(self.conf) + + with self.assertRaises(ValueError): + results = extractor.extract() + + def test_raises_if_pivot_is_not_const(self) -> None: + with patch.object(JSONSchemaExtractor, "_get_schema") as mock_get_schema: + mock_get_schema.return_value = { + "type": "object", + "title": "test-schema", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"}, + "glasses": {"type": "boolean"}, + }, + "required": ["name"], + "oneOf": [ + { + "properties": { + "glasses": {"type": "string"}, + "focal_length": {"type": "float"}, + }, + "required": ["focal_length"], + } + ], + } + + extractor = JSONSchemaExtractor( + "test-connection", "test-database", "test-schema-path", "glasses" + ) + extractor.init(self.conf) + + with self.assertRaises(ValueError): + results = extractor.extract() + + def test_extracts_with_pivot(self) -> None: + with patch.object(JSONSchemaExtractor, "_get_schema") as mock_get_schema: + mock_get_schema.return_value = { + "type": "object", + "title": "test-schema", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"}, + "glasses": {"type": "boolean"}, + }, + "required": ["name"], + "oneOf": [ + { + "properties": { + "glasses": {"const": True}, + "focal_length": {"type": "float"}, + }, + "required": ["focal_length"], + }, + { + "properties": { + "glasses": {"const": False}, + }, + }, + ], + } + + extractor = JSONSchemaExtractor( + "test-connection", "test-database", "test-schema-path", "glasses" + ) + extractor.init(self.conf) + + results = extractor.extract() + self.assertEqual(results.connection, "test-connection") + self.assertEqual(results.database, "test-database") + self.assertEqual(results.location, "test-schema-path") + self.assertEqual(results.name, "True") + self.assertEqual(results.columns[3].name, "focal_length") + self.assertEqual(results.columns[3].column_type, "float (required)") + + results = extractor.extract() + self.assertEqual(results.connection, "test-connection") + self.assertEqual(results.database, "test-database") + self.assertEqual(results.location, "test-schema-path") + self.assertEqual(results.name, "False") + self.assertEqual(len(results.columns), 3) + + results = extractor.extract() + self.assertEqual(results, None) + + def test_expands_top_level_object(self) -> None: + with patch.object(JSONSchemaExtractor, "_get_schema") as mock_get_schema: + mock_get_schema.return_value = { + "type": "object", + "title": "test-schema", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"}, + "test-props": { + "type": "object", + "properties": { + "person-prop-1": {"type": "string"}, + "person-prop-2": {"type": "integer"}, + }, + }, + }, + } + + extractor = JSONSchemaExtractor( + "test-connection", + "test-database", + "test-schema-path", + object_expand=["test-props"], + ) + extractor.init(self.conf) + + results = extractor.extract() + self.assertEqual(len(results.columns), 5) + self.assertEqual(results.columns[2].__repr__(), ColumnMetadata("test-props", "object").__repr__()) + self.assertEqual( + results.columns[3].__repr__(), ColumnMetadata("test-props.person-prop-1", "string").__repr__() + ) + self.assertEqual( + results.columns[4].__repr__(), + ColumnMetadata("test-props.person-prop-2", "integer").__repr__(), + ) + + def test_expands_pivoted_object(self) -> None: + with patch.object(JSONSchemaExtractor, "_get_schema") as mock_get_schema: + mock_get_schema.return_value = { + "type": "object", + "title": "test-schema", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"}, + "test-props": { + "type": "object", + "properties": { + "person-prop-1": {"type": "string"}, + "person-prop-2": {"type": "integer"}, + }, + }, + }, + "oneOf": [ + { + "properties": { + "name": {"const": "John"}, + "test-props": { + "properties": { + "person-prop-3": {"type": "float"} + } + } + } + }, + { + "properties": { + "name": {"const": "Mary"}, + "test-props": { + "properties": { + "person-prop-4": {"type": "float"} + } + } + } + } + ] + } + + extractor = JSONSchemaExtractor( + "test-connection", + "test-database", + "test-schema-path", + "name", + object_expand=["test-props"], + ) + extractor.init(self.conf) + + results = extractor.extract() + self.assertEqual(results.name, "John") + self.assertEqual(len(results.columns), 6) + self.assertEqual( + results.columns[5].__repr__(), + ColumnMetadata("test-props.person-prop-3", "float").__repr__(), + ) + + results = extractor.extract() + self.assertEqual(results.name, "Mary") + self.assertEqual(len(results.columns), 6) + self.assertEqual( + results.columns[5].name, "test-props.person-prop-4" + )