Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Unreleased

- **Airbyte schema handling**: Improved `create_yml_schema.py` for both `init_setup_files_v1` and `init_setup_files_v2` to robustly handle the new Airbyte connection schema format.
- Added recursive flattening for nested `object` fields into dot-notation column names.
- Safely handle `anyOf` branches and objects that only define `additionalProperties` (no `properties`) to avoid `KeyError: 'properties'`.
- Kept existing behaviour for flat schemas that use `airbyte_type` (e.g. MySQL-style connections), ensuring backwards compatibility.

7 changes: 7 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ docker buildx build . \
--tag europe-central2-docker.pkg.dev/fast-bi-common/bi-platform/tsb-dbt-init-core:${dbt_init_version} \
--platform linux/amd64 \
--push

docker buildx build . \
--pull \
--tag 4fastbi/data-platform-init-core:dev-latest \
--tag 4fastbi/data-platform-init-core:dev-v0.1.2 \
--platform linux/amd64 \
--push
179 changes: 69 additions & 110 deletions init_setup_files_v1/create_yml_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,124 +189,83 @@ def create_yml_dict(parsed_json, connection_id, table_name):
for item in sublist
if sublist != "null"
]
if tb["stream"]["jsonSchema"].get("properties"):
for col in tb["stream"]["jsonSchema"]["properties"]:

json_schema = tb.get("stream", {}).get("jsonSchema", {})
properties = json_schema.get("properties", {}) or {}

def handle_object(te, path_prefix: str, schema: dict, desc_default: str = ""):
"""
Recursively flattens nested object properties into dot.notation columns.
Objects without explicit properties (only additionalProperties, etc.) are
treated as string columns to avoid KeyError on 'properties'.
"""
# If no properties – treat as scalar string
if not isinstance(schema, dict) or "properties" not in schema:
add_column(te, "columns", path_prefix, "string", desc_default)
return

for prop_name, prop_schema in schema["properties"].items():
col_name = f"{path_prefix}.{prop_name}"
col_desc = prop_schema.get("description", "")
col_type = prop_schema.get("type")

if isinstance(col_type, list):
col_type = delete_null_from_list(col_type)

# Nested object – recurse
if col_type == "object":
handle_object(te, col_name, prop_schema, col_desc)
else:
# Scalar or array or unknown type – just emit as-is
add_column(te, "columns", col_name, col_type or "string", col_desc)

if properties:
for col, col_schema in properties.items():
# Process column for all models
for te in yml_dict["models"]:
if tb["stream"]["jsonSchema"]["properties"][col].get("airbyte_type"):
col_type = tb["stream"]["jsonSchema"]["properties"][col].get("airbyte_type")
col_desc = tb["stream"]["jsonSchema"]["properties"][col].get("description", "")
# 1) airbyte_type takes precedence when present (numeric/timestamp/etc.)
if col_schema.get("airbyte_type"):
col_type = col_schema.get("airbyte_type")
col_desc = col_schema.get("description", "")
add_column(te, "columns", col, col_type, col_desc)
elif tb["stream"]["jsonSchema"]["properties"][col].get("anyOf"):
col_items = [d["properties"] for i, d in enumerate(
tb["stream"]["jsonSchema"]["properties"][col].get("anyOf"))
if "properties" in d.keys()]
if col_items:
for sub_col_obj_prop in col_items:
for item in sub_col_obj_prop:
sub_col_name = col + "." + item
sub_col_type = sub_col_obj_prop[item].get("type", "string")
sub_col_type = delete_null_from_list(sub_col_type)
sub_col_desc = sub_col_obj_prop[item].get("description", "")
add_column(
te,
"columns",
sub_col_name,
sub_col_type,
sub_col_desc, )

# 2) anyOf – pick object branches with properties; otherwise fall back to scalar/array
elif col_schema.get("anyOf"):
object_branches = [
d for d in col_schema.get("anyOf", [])
if isinstance(d, dict) and "properties" in d
]
if object_branches:
for branch in object_branches:
# treat each branch as nested object under base column name
handle_object(te, col, branch, col_schema.get("description", ""))
else:
add_column(te, "columns", col, "array", "")

# No object branches – treat as array/union as generic string
add_column(te, "columns", col, "array", col_schema.get("description", ""))

# 3) Normal type handling
else:
col_type = tb["stream"]["jsonSchema"]["properties"][col].get("type")
col_type = col_schema.get("type")
if isinstance(col_type, list):
col_type = delete_null_from_list(col_type)
col_desc = tb["stream"]["jsonSchema"]["properties"][col].get("description")
col_desc = col_schema.get("description", "")

# 3a) Top-level object – flatten its properties
if col_type == "object":
if tb["stream"]["jsonSchema"]["properties"][col].get("properties"):
for col_obj_prop in tb["stream"]["jsonSchema"]["properties"][col][
"properties"]:
sub_col_name = col + "." + col_obj_prop
sub_col_type = \
tb["stream"]["jsonSchema"]["properties"][col]["properties"][
col_obj_prop]["type"]
sub_col_type = delete_null_from_list(sub_col_type)
sub_col_desc = \
tb["stream"]["jsonSchema"]["properties"][col]["properties"][
col_obj_prop].get(
"description", "")

if sub_col_type != "object":
add_column(
te,
"columns",
sub_col_name,
sub_col_type,
sub_col_desc, )
else:
col_info = \
tb["stream"]["jsonSchema"]["properties"][col]["properties"][
col_obj_prop]
if col_info.get("properties"):
for sub_col_obj_prop in col_info["properties"]:
# print(col_info["properties"][sub_col_obj_prop])
col_sub_sub = col_info["properties"][sub_col_obj_prop].get(
"properties")
if col_sub_sub:
for i in col_sub_sub:
col_sub_sub_name = sub_col_name + "." + sub_col_obj_prop + "." + i
print(col_sub_sub_name)
col_sub_sub_type_ = col_sub_sub[i].get("type",
"string")
col_sub_sub_type = delete_null_from_list(
col_sub_sub_type_)
col_sub_sub_desc = col_sub_sub[i].get("description",
"")
add_column(
te,
"columns",
col_sub_sub_name,
col_sub_sub_type,
col_sub_sub_desc, )

sub_sub_col_name = (sub_col_name + "." + sub_col_obj_prop)

sub_sub_col_type = col_info["properties"][
sub_col_obj_prop].get(
"type", "string")

sub_sub_col_desc = col_info["properties"][
sub_col_obj_prop].get(
"description", "")
add_column(
te,
"columns",
sub_sub_col_name,
sub_sub_col_type,
sub_sub_col_desc, )
else:
add_column(te,
"columns",
col,
col_type,
col_desc, )
handle_object(te, col, col_schema, col_desc)
else:
add_column(
te,
"columns",
col,
col_type,
tb["stream"]["jsonSchema"]["properties"][
col
].get("description", ""),
)

for i in te["columns"]:
if i["identifier"] in constraints:
i["constraints"] = [
{"type": "not_null"},
{"type": "unique"},
{"type": "primary_key"},
]
# 3b) Scalar/array/etc.
add_column(te, "columns", col, col_type, col_desc)

# Apply constraints after all columns are processed
for te in yml_dict["models"]:
for i in te["columns"]:
if i["identifier"] in constraints:
i["constraints"] = [
{"type": "not_null"},
{"type": "unique"},
{"type": "primary_key"},
]
for i in yml_dict["models"]:
i["columns"].append(
{
Expand Down
Loading