diff --git a/.github/CHANGELOG.md b/.github/CHANGELOG.md new file mode 100644 index 0000000..3d1fdc4 --- /dev/null +++ b/.github/CHANGELOG.md @@ -0,0 +1,7 @@ +## Unreleased + +- **Airbyte schema handling**: Improved `create_yml_schema.py` for both `init_setup_files_v1` and `init_setup_files_v2` to robustly handle the new Airbyte connection schema format. + - Added recursive flattening for nested `object` fields into dot-notation column names. + - Safely handle `anyOf` branches and objects that only define `additionalProperties` (no `properties`) to avoid `KeyError: 'properties'`. + - Kept existing behaviour for flat schemas that use `airbyte_type` (e.g. MySQL-style connections), ensuring backwards compatibility. + diff --git a/build.sh b/build.sh index 9e905c9..05b3ebe 100644 --- a/build.sh +++ b/build.sh @@ -18,3 +18,10 @@ docker buildx build . \ --tag europe-central2-docker.pkg.dev/fast-bi-common/bi-platform/tsb-dbt-init-core:${dbt_init_version} \ --platform linux/amd64 \ --push + +docker buildx build . \ + --pull \ + --tag 4fastbi/data-platform-init-core:dev-latest \ + --tag 4fastbi/data-platform-init-core:dev-v0.1.2 \ + --platform linux/amd64 \ + --push \ No newline at end of file diff --git a/init_setup_files_v1/create_yml_schema.py b/init_setup_files_v1/create_yml_schema.py index 91127b2..b7de0b8 100644 --- a/init_setup_files_v1/create_yml_schema.py +++ b/init_setup_files_v1/create_yml_schema.py @@ -189,124 +189,83 @@ def create_yml_dict(parsed_json, connection_id, table_name): for item in sublist if sublist != "null" ] - if tb["stream"]["jsonSchema"].get("properties"): - for col in tb["stream"]["jsonSchema"]["properties"]: + + json_schema = tb.get("stream", {}).get("jsonSchema", {}) + properties = json_schema.get("properties", {}) or {} + + def handle_object(te, path_prefix: str, schema: dict, desc_default: str = ""): + """ + Recursively flattens nested object properties into dot.notation columns. + Objects without explicit properties (only additionalProperties, etc.) are + treated as string columns to avoid KeyError on 'properties'. + """ + # If no properties – treat as scalar string + if not isinstance(schema, dict) or "properties" not in schema: + add_column(te, "columns", path_prefix, "string", desc_default) + return + + for prop_name, prop_schema in schema["properties"].items(): + col_name = f"{path_prefix}.{prop_name}" + col_desc = prop_schema.get("description", "") + col_type = prop_schema.get("type") + + if isinstance(col_type, list): + col_type = delete_null_from_list(col_type) + + # Nested object – recurse + if col_type == "object": + handle_object(te, col_name, prop_schema, col_desc) + else: + # Scalar or array or unknown type – just emit as-is + add_column(te, "columns", col_name, col_type or "string", col_desc) + + if properties: + for col, col_schema in properties.items(): + # Process column for all models for te in yml_dict["models"]: - if tb["stream"]["jsonSchema"]["properties"][col].get("airbyte_type"): - col_type = tb["stream"]["jsonSchema"]["properties"][col].get("airbyte_type") - col_desc = tb["stream"]["jsonSchema"]["properties"][col].get("description", "") + # 1) airbyte_type takes precedence when present (numeric/timestamp/etc.) + if col_schema.get("airbyte_type"): + col_type = col_schema.get("airbyte_type") + col_desc = col_schema.get("description", "") add_column(te, "columns", col, col_type, col_desc) - elif tb["stream"]["jsonSchema"]["properties"][col].get("anyOf"): - col_items = [d["properties"] for i, d in enumerate( - tb["stream"]["jsonSchema"]["properties"][col].get("anyOf")) - if "properties" in d.keys()] - if col_items: - for sub_col_obj_prop in col_items: - for item in sub_col_obj_prop: - sub_col_name = col + "." + item - sub_col_type = sub_col_obj_prop[item].get("type", "string") - sub_col_type = delete_null_from_list(sub_col_type) - sub_col_desc = sub_col_obj_prop[item].get("description", "") - add_column( - te, - "columns", - sub_col_name, - sub_col_type, - sub_col_desc, ) + + # 2) anyOf – pick object branches with properties; otherwise fall back to scalar/array + elif col_schema.get("anyOf"): + object_branches = [ + d for d in col_schema.get("anyOf", []) + if isinstance(d, dict) and "properties" in d + ] + if object_branches: + for branch in object_branches: + # treat each branch as nested object under base column name + handle_object(te, col, branch, col_schema.get("description", "")) else: - add_column(te, "columns", col, "array", "") - + # No object branches – treat as array/union as generic string + add_column(te, "columns", col, "array", col_schema.get("description", "")) + + # 3) Normal type handling else: - col_type = tb["stream"]["jsonSchema"]["properties"][col].get("type") + col_type = col_schema.get("type") if isinstance(col_type, list): col_type = delete_null_from_list(col_type) - col_desc = tb["stream"]["jsonSchema"]["properties"][col].get("description") + col_desc = col_schema.get("description", "") + + # 3a) Top-level object – flatten its properties if col_type == "object": - if tb["stream"]["jsonSchema"]["properties"][col].get("properties"): - for col_obj_prop in tb["stream"]["jsonSchema"]["properties"][col][ - "properties"]: - sub_col_name = col + "." + col_obj_prop - sub_col_type = \ - tb["stream"]["jsonSchema"]["properties"][col]["properties"][ - col_obj_prop]["type"] - sub_col_type = delete_null_from_list(sub_col_type) - sub_col_desc = \ - tb["stream"]["jsonSchema"]["properties"][col]["properties"][ - col_obj_prop].get( - "description", "") - - if sub_col_type != "object": - add_column( - te, - "columns", - sub_col_name, - sub_col_type, - sub_col_desc, ) - else: - col_info = \ - tb["stream"]["jsonSchema"]["properties"][col]["properties"][ - col_obj_prop] - if col_info.get("properties"): - for sub_col_obj_prop in col_info["properties"]: - # print(col_info["properties"][sub_col_obj_prop]) - col_sub_sub = col_info["properties"][sub_col_obj_prop].get( - "properties") - if col_sub_sub: - for i in col_sub_sub: - col_sub_sub_name = sub_col_name + "." + sub_col_obj_prop + "." + i - print(col_sub_sub_name) - col_sub_sub_type_ = col_sub_sub[i].get("type", - "string") - col_sub_sub_type = delete_null_from_list( - col_sub_sub_type_) - col_sub_sub_desc = col_sub_sub[i].get("description", - "") - add_column( - te, - "columns", - col_sub_sub_name, - col_sub_sub_type, - col_sub_sub_desc, ) - - sub_sub_col_name = (sub_col_name + "." + sub_col_obj_prop) - - sub_sub_col_type = col_info["properties"][ - sub_col_obj_prop].get( - "type", "string") - - sub_sub_col_desc = col_info["properties"][ - sub_col_obj_prop].get( - "description", "") - add_column( - te, - "columns", - sub_sub_col_name, - sub_sub_col_type, - sub_sub_col_desc, ) - else: - add_column(te, - "columns", - col, - col_type, - col_desc, ) + handle_object(te, col, col_schema, col_desc) else: - add_column( - te, - "columns", - col, - col_type, - tb["stream"]["jsonSchema"]["properties"][ - col - ].get("description", ""), - ) - - for i in te["columns"]: - if i["identifier"] in constraints: - i["constraints"] = [ - {"type": "not_null"}, - {"type": "unique"}, - {"type": "primary_key"}, - ] + # 3b) Scalar/array/etc. + add_column(te, "columns", col, col_type, col_desc) + + # Apply constraints after all columns are processed + for te in yml_dict["models"]: + for i in te["columns"]: + if i["identifier"] in constraints: + i["constraints"] = [ + {"type": "not_null"}, + {"type": "unique"}, + {"type": "primary_key"}, + ] for i in yml_dict["models"]: i["columns"].append( { diff --git a/init_setup_files_v2/create_yml_schema.py b/init_setup_files_v2/create_yml_schema.py index 09a5906..2029a2f 100644 --- a/init_setup_files_v2/create_yml_schema.py +++ b/init_setup_files_v2/create_yml_schema.py @@ -195,70 +195,84 @@ def add_model_column(col, col_name, col_type, col_desc=""): def create_source_yml_dict(tb, tb_name, prefix_with_tb_name): source_columns = [] model_columns = [] - for col in tb["stream"]["jsonSchema"]["properties"]: - if tb["stream"]["jsonSchema"]["properties"][col].get("airbyte_type"): - col_type = tb["stream"]["jsonSchema"]["properties"][ - col - ].get("airbyte_type") - col_desc = tb["stream"]["jsonSchema"]["properties"][ - col - ].get("description", "") - add_source_column(source_columns, col, col_type, col_desc) - add_model_column(model_columns, col, col_type, col_desc) - elif tb["stream"]["jsonSchema"]["properties"][col].get("anyOf"): - col_items = [ - d["properties"] - for i, d in enumerate( - tb["stream"]["jsonSchema"]["properties"][col].get("anyOf") - ) - if "properties" in d.keys() - ] - if col_items: - for sub_col_obj_prop in col_items: - for item in sub_col_obj_prop: - sub_col_name = col + "." + item - sub_col_type = sub_col_obj_prop[item].get("type", "string") - sub_col_type = delete_null_from_list(sub_col_type) - sub_col_desc = sub_col_obj_prop[item].get("description", "") - add_source_column(source_columns, sub_col_name, sub_col_type, sub_col_desc) - add_model_column(model_columns, sub_col_name, sub_col_type, sub_col_desc) - else: - add_source_column(source_columns, col, "array", "") - add_model_column(model_columns, col, "array", "") - - else: - col_type = tb["stream"]["jsonSchema"]["properties"][col].get("type") + json_schema = tb.get("stream", {}).get("jsonSchema", {}) + properties = json_schema.get("properties", {}) or {} + + def handle_object(path_prefix: str, schema: dict, desc_default: str = ""): + """ + Recursively flattens nested object properties into dot.notation columns. + Objects without explicit properties (only additionalProperties, etc.) are + treated as string columns to avoid KeyError on 'properties'. + """ + # If no properties – treat as scalar string + if not isinstance(schema, dict) or "properties" not in schema: + add_source_column(source_columns, path_prefix, "string", desc_default) + add_model_column(model_columns, path_prefix, "string", desc_default) + return + + for prop_name, prop_schema in schema["properties"].items(): + col_name = f"{path_prefix}.{prop_name}" + col_desc = prop_schema.get("description", "") + col_type = prop_schema.get("type") if isinstance(col_type, list): col_type = delete_null_from_list(col_type) + # Nested object – recurse if col_type == "object": - if "properties" in tb["stream"]["jsonSchema"]["properties"][col]: - for col_obj_prop in tb["stream"]["jsonSchema"]["properties"][col]["properties"]: - sub_col_name = col + "." + col_obj_prop - sub_col_type = tb["stream"]["jsonSchema"]["properties"][col]["properties"][col_obj_prop]["type"] - sub_col_type = delete_null_from_list(sub_col_type) - sub_col_desc = tb["stream"]["jsonSchema"]["properties"][col]["properties"][col_obj_prop].get( - "description", "") - - if sub_col_type != "object": - add_source_column(source_columns, sub_col_name, sub_col_type, sub_col_desc) - add_model_column(model_columns, sub_col_name, sub_col_type, sub_col_desc) - else: - col_info = tb["stream"]["jsonSchema"]["properties"][col]["properties"][col_obj_prop] - for sub_col_obj_prop in col_info["properties"]: - sub_sub_col_name = (sub_col_name + "." + sub_col_obj_prop) - sub_sub_col_type = col_info["properties"][sub_col_obj_prop].get("type", "string") - sub_sub_col_desc = col_info["properties"][sub_col_obj_prop].get("description", "") - add_source_column(source_columns, sub_sub_col_name, sub_sub_col_type, - sub_sub_col_desc, ) - add_model_column(model_columns, sub_sub_col_name, sub_sub_col_type, sub_sub_col_desc, ) + handle_object(col_name, prop_schema, col_desc) else: - add_source_column(source_columns, col, col_type, - tb["stream"]["jsonSchema"]["properties"][col].get("description", "")) - add_model_column(model_columns, col, col_type, - tb["stream"]["jsonSchema"]["properties"][col].get("description", "")) + # Scalar or array or unknown type – just emit as-is + add_source_column(source_columns, col_name, col_type or "string", col_desc) + add_model_column(model_columns, col_name, col_type or "string", col_desc) + + for col, col_schema in properties.items(): + # 1) airbyte_type takes precedence when present (numeric/timestamp/etc.) + if col_schema.get("airbyte_type"): + col_type = col_schema.get("airbyte_type") + col_desc = col_schema.get("description", "") + add_source_column(source_columns, col, col_type, col_desc) + add_model_column(model_columns, col, col_type, col_desc) + continue + + # 2) anyOf – pick object branches with properties; otherwise fall back to scalar/array + if col_schema.get("anyOf"): + object_branches = [ + d for d in col_schema.get("anyOf", []) if isinstance(d, dict) and "properties" in d + ] + if object_branches: + for branch in object_branches: + # treat each branch as nested object under base column name + handle_object(col, branch, col_schema.get("description", "")) + else: + # No object branches – treat as array/union as generic string + add_source_column(source_columns, col, "array", col_schema.get("description", "")) + add_model_column(model_columns, col, "array", col_schema.get("description", "")) + continue + + # 3) Normal type handling + col_type = col_schema.get("type") + if isinstance(col_type, list): + col_type = delete_null_from_list(col_type) + + # 3a) Top-level object – flatten its properties + if col_type == "object": + handle_object(col, col_schema, col_schema.get("description", "")) + else: + # 3b) Scalar/array/etc. + add_source_column( + source_columns, + col, + col_type, + col_schema.get("description", ""), + ) + add_model_column( + model_columns, + col, + col_type, + col_schema.get("description", ""), + ) constraints = [ item