Skip to content

Commit

Permalink
proto and JSON imports and more JSONS features
Browse files Browse the repository at this point in the history
Signed-off-by: Clemens Vasters <clemens@vasters.com>
  • Loading branch information
clemensv committed Feb 16, 2024
1 parent 2a1987b commit 57f0776
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 41 deletions.
17 changes: 9 additions & 8 deletions avrotize/dependency_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

def inline_dependencies_of(avro_schema, record):
for dependency in record.get('dependencies', []):
dependency_type = next((x for x in avro_schema if x['name'] == dependency), None)
dependency_type = next((x for x in avro_schema if x['name'] == dependency or x.get('namespace','')+'.'+x['name'] == dependency), None)
if not dependency_type:
continue
deps = record.get('dependencies', [])
for field in record['fields']:
if record['name'] != dependency:
swap_dependency_type(avro_schema, field, dependency, dependency_type, deps)
record['dependencies'] = [dep for dep in deps if dep != record['name']]
record['dependencies'] = [dep for dep in deps if dep != record['name'] and record.get('namespace','')+'.'+record['name'] != dep]
if 'dependencies' in record:
del record['dependencies']

Expand All @@ -20,7 +20,8 @@ def sort_messages_by_dependencies(avro_schema):
while avro_schema:
found = False
for record in avro_schema:
if not any(record['name'] in other_record.get('dependencies', []) for other_record in avro_schema):
if not any(record.get('name') in other_record.get('dependencies', [])
or (record.get('namespace','')+'.'+record.get('name')) in other_record.get('dependencies', []) for other_record in avro_schema):
if 'dependencies' in record:
del record['dependencies']
sorted_messages.append(record)
Expand All @@ -36,14 +37,14 @@ def sort_messages_by_dependencies(avro_schema):

def swap_record_dependencies(avro_schema, record):
for dependency in record.get('dependencies', []):
dependency_type = next((x for x in avro_schema if x['name'] == dependency), None)
dependency_type = next((x for x in avro_schema if x['name'] == dependency or x.get('namespace','')+'.'+x['name'] == dependency), None)
if not dependency_type:
continue
deps = record.get('dependencies', [])
for field in record['fields']:
if record['name'] != dependency:
if record['name'] != dependency and (record.get('namespace','')+'.'+record['name']) != dependency:
swap_dependency_type(avro_schema, field, dependency, dependency_type, deps)
record['dependencies'] = [dep for dep in deps if dep != record['name']]
record['dependencies'] = [dep for dep in deps if dep != record['name'] and record.get('namespace','')+'.'+record['name'] != dep]

def swap_dependency_type(avro_schema, field, dependency, dependency_type, dependencies):
""" to break circular dependencies, we will inline the dependent record and remove the dependency """
Expand Down Expand Up @@ -76,8 +77,8 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
swap_record_dependencies(avro_schema, dependency_type)
del dependency_type['dependencies']
# type is an object?
elif type(field_type) is dict and 'type' in field_type and field_type['name'] != dependency:
swap_dependency_type(avro_schema, field_type, dependency, dependency_type)
elif type(field_type) is dict and field_type.get('type') != None and field_type.get('name') == dependency:
swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies)
elif 'type' in field['type']:
swap_dependency_type(avro_schema, field['type'], dependency, dependency_type, dependencies)
elif field['type'] == 'array':
Expand Down
227 changes: 197 additions & 30 deletions avrotize/jsonstoavro.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import json
from avrotize.dependency_resolver import sort_messages_by_dependencies
import os
import re
import jsonpointer
import requests
from avrotize.dependency_resolver import inline_dependencies_of, sort_messages_by_dependencies
from urllib.parse import ParseResult, urljoin, urlparse


def json_schema_primitive_to_avro_type(json_primitive: str, format: str, enum: list, field_name: str, dependencies: list) -> str:
Expand Down Expand Up @@ -48,30 +53,133 @@ def merge_schemas(schemas):
merged_schema['fields'].extend(schema['fields'])
return merged_schema

imported_types = {}

def json_type_to_avro_type(json_type: str | dict, field_name: str, namespace : str, dependencies: list) -> dict:
def fetch_content(url: str | ParseResult):
# Parse the URL to determine the scheme
if isinstance(url, str):
parsed_url = urlparse(url)
else:
parsed_url = url
scheme = parsed_url.scheme

# Handle HTTP and HTTPS URLs
if scheme in ['http', 'https']:
try:
response = requests.get(url)
response.raise_for_status() # Raises an HTTPError if the response status code is 4XX/5XX
return response.text
except requests.RequestException as e:
return f'Error fetching {url}: {e}'

# Handle file URLs
elif scheme == 'file':
# Remove the leading 'file://' from the path for compatibility
file_path = parsed_url.netloc
# On Windows, a file URL might start with a '/' but it's not part of the actual path
if os.name == 'nt' and file_path.startswith('/'):
file_path = file_path[1:]
try:
with open(file_path, 'r') as file:
return file.read()
except Exception as e:
return f'Error reading file at {file_path}: {e}'

else:
return f'Unsupported URL scheme: {scheme}'

def resolve_reference(json_type: dict, base_uri: str, json_doc: dict):
"""Resolve a JSON Pointer reference or a JSON $ref reference."""
ref = json_type['$ref']
content = None
url = urlparse(ref)
if url.scheme:
content = fetch_content(ref)
elif url.path:
file_uri = urljoin(base_uri, url.path)
content = fetch_content(file_uri)
if content:
try:
json_schema = json.loads(content)
# resolve the JSON Pointer reference, if any
if url.fragment:
json_schema = jsonpointer.resolve_pointer(json_schema, url.fragment)
imported_types[ref] = json_schema
return json_schema
except json.JSONDecodeError:
raise Exception(f'Error decoding JSON from {ref}')

if url.fragment:
json_pointer = url.fragment
ref_schema = jsonpointer.resolve_pointer(json_doc, json_pointer)
if ref_schema:
imported_types[ref] = ref_schema
return ref_schema
return json_type



def json_type_to_avro_type(json_type: str | dict, field_name: str, namespace : str, dependencies: list, json_schema: dict, base_uri: str, avro_schema: list) -> dict:
"""Convert a JSON type to Avro type."""
if isinstance(json_type, dict):
if '$ref' in json_type:
ref = json_type['$ref']
if ref in imported_types:
return imported_types[ref]
else:
new_base_uri = urljoin(base_uri, json_type['$ref'])
resolved_json_type = resolve_reference(json_type, base_uri, json_schema)
if len(json_type) == 1:
# it's a standalone reference, so will import the type into the schema
# and reference it like it was in the same file
parsed_ref = urlparse(ref)
if parsed_ref.fragment:
field_name = parsed_ref.fragment.split('/')[-1]
avro_type = json_type_to_avro_type(resolved_json_type, field_name, namespace, dependencies, json_schema, new_base_uri, avro_schema)
existing_type = next((t for t in avro_schema if t.get('name') == avro_type['name'] and t.get('namespace') == avro_type.get('namespace') ), None)
if not existing_type:
avro_schema.append(avro_type)
imported_types[ref] = avro_type['name']
dependencies.append(avro_type['name'])
avro_type = avro_type['name']
else:
# it's a reference within a definition, so we will turn this into an inline type
json_type.update(resolved_json_type)
del json_type['$ref']
avro_type = json_type_to_avro_type(json_type, field_name, namespace, dependencies, json_schema, new_base_uri, avro_schema)
imported_types[ref] = avro_type['name']
return avro_type

t = json_type.get('type', 'object')
if t == 'array':
avro_type = {"type": "array", "items": json_type_to_avro_type(json_type['items'], field_name, namespace, dependencies)}
avro_type = {"type": "array", "items": json_type_to_avro_type(json_type['items'], field_name, namespace, dependencies, json_schema, base_uri, avro_schema)}
elif 'oneOf' in json_type:
avro_type = [json_type_to_avro_type(one_type, field_name, namespace, dependencies) for one_type in json_type['oneOf']]
avro_type = [json_type_to_avro_type(one_type, field_name, namespace, dependencies, json_schema, base_uri, avro_schema) for one_type in json_type['oneOf']]
elif 'allOf' in json_type:
avro_type = merge_schemas([json_type_to_avro_type(schema, field_name, namespace, dependencies) for schema in json_type['allOf']])
avro_type = merge_schemas([json_type_to_avro_type(schema, field_name, namespace, dependencies, json_schema, base_uri, avro_schema) for schema in json_type['allOf']])
elif 'anyOf' in json_type:
avro_type = merge_schemas([json_type_to_avro_type(schema, field_name, namespace, dependencies, json_schema, base_uri, avro_schema) for schema in json_type['anyOf']])
elif t == 'object':
avro_type = json_schema_object_to_avro_record(field_name, json_type, namespace)
avro_type = json_schema_object_to_avro_record(field_name, json_type, namespace, json_schema, base_uri, avro_schema)
else:
avro_type = json_schema_primitive_to_avro_type(t, None, None, field_name, dependencies)
else:
avro_type = json_schema_primitive_to_avro_type(json_type, None, None, field_name, dependencies)

if 'name' in avro_type:
existing_type = next((t for t in avro_schema if t.get('name') == avro_type['name'] and t.get('namespace') == avro_type.get('namespace') ), None)
if existing_type:
return existing_type.get('name')

return avro_type

def json_schema_object_to_avro_record(name: str, json_object: dict, namespace) -> dict:
def json_schema_object_to_avro_record(name: str, json_object: dict, namespace: str, json_schema: dict, base_uri: str, avro_schema: list) -> dict:
"""Convert a JSON schema object declaration to an Avro record."""

dependencies = []
title = json_object.get('title')
if title:
title = title.replace(" ", "_")
avro_record = {
'type': 'record',
'name': title if title else name if name else 'record',
Expand All @@ -82,7 +190,7 @@ def json_schema_object_to_avro_record(name: str, json_object: dict, namespace) -
required_fields = json_object.get('required', [])
if 'properties' in json_object:
for field_name, field in json_object['properties'].items():
avro_field_type = json_type_to_avro_type(field, field_name, namespace, dependencies)
avro_field_type = json_type_to_avro_type(field, field_name, namespace, dependencies, json_schema, base_uri, avro_schema)

if avro_field_type is None:
raise ValueError(f"avro_field_type is None for field {field_name}")
Expand All @@ -98,50 +206,109 @@ def json_schema_object_to_avro_record(name: str, json_object: dict, namespace) -
avro_record["fields"].append(avro_field)
if len(dependencies) > 0:
avro_record['dependencies'] = dependencies

if 'additionalProperties' in json_object and isinstance(json_object['additionalProperties'], dict):
additional_props = json_object['additionalProperties']
avro_record['fields'].append({"name": "additionalProperties", "type": "map", "values": json_type_to_avro_type(additional_props, "additionalProperties", namespace, dependencies, json_schema, base_uri, avro_schema)})
elif 'patternProperties' in json_object and isinstance(json_object['patternProperties'], dict):
pattern_props = json_object['patternProperties']
prop_types = []
for pattern, props in pattern_props.items():
pattern = re.sub(r'[^a-zA-Z0-9_]', '_', pattern)
prop_types.append(json_type_to_avro_type(props, pattern, namespace, dependencies, json_schema, base_uri, avro_schema))
avro_record = {
"type": "map",
"name": title if title else name if name else 'record',
"namespace": namespace,
"values": prop_types[0] if len(prop_types) == 1 else prop_types
}
else:
avro_record = {
"type": "map",
"values": [
"null",
"boolean",
"double",
"string"
]
}
if 'additionalProperties' in json_object and isinstance(json_object['additionalProperties'], dict):
additional_props = json_object['additionalProperties']
avro_record = {
"type": "map",
"name": title if title else name if name else 'record',
"namespace": namespace,
"values": json_type_to_avro_type(additional_props, "additionalProperties", namespace, dependencies, json_schema, base_uri, avro_schema)
}
elif 'patternProperties' in json_object and isinstance(json_object['patternProperties'], dict):
pattern_props = json_object['patternProperties']
prop_types = []
for pattern, props in pattern_props.items():
pattern = re.sub(r'[^a-zA-Z0-9_]', '_', pattern)
prop_types.append(json_type_to_avro_type(props, pattern, namespace, dependencies, json_schema, base_uri, avro_schema))
avro_record = {
"type": "map",
"name": title if title else name if name else 'record',
"namespace": namespace,
"values": prop_types[0] if len(prop_types) == 1 else prop_types
}
else:
avro_record = {
"type": "map",
"name": title if title else name if name else "empty",
"values": [
"null",
"boolean",
"double",
"string"
]
}

if 'description' in json_object:
avro_record['doc'] = json_object['description']
return avro_record


def jsons_to_avro(json_schema: dict | list, namespace: str) -> list:
def jsons_to_avro(json_schema: dict | list, namespace: str, base_uri: str) -> list:
"""Convert a JSON-schema to an Avro-schema."""
avro_schema = []

# check whether this is indeed a swagger file and then grab the definitions section
if 'swagger' in json_schema:
json_schema = json_schema.get('definitions', {})
if not json_schema:
parsed_url = urlparse(base_uri)
if parsed_url.fragment:
json_pointer = parsed_url.fragment
schema_name = parsed_url.fragment.split('/')[-1]
schema = jsonpointer.resolve_pointer(json_schema, json_pointer)
avro_schema_item = json_schema_object_to_avro_record(schema_name, schema, namespace, json_schema, base_uri, avro_schema)
avro_schema.append(avro_schema_item)
inline_dependencies_of(avro_schema, avro_schema_item)
return avro_schema
elif 'swagger' in json_schema:
json_schema_defs = json_schema.get('definitions', {})
if not json_schema_defs:
raise ValueError('No definitions found in swagger file')
for schema_name, schema in json_schema.items():
avro_schema_item = json_schema_object_to_avro_record(schema_name, schema, namespace)
avro_schema_item['name'] = schema_name
avro_schema.append(avro_schema_item)
for schema_name, schema in json_schema_defs.items():
avro_schema_item = json_schema_object_to_avro_record(schema_name, schema, namespace, json_schema, base_uri, avro_schema)
existing_type = next((t for t in avro_schema if t.get('name') == avro_schema_item['name'] and t.get('namespace') == avro_schema_item.get('namespace') ), None)
if not existing_type:
avro_schema_item['name'] = schema_name
avro_schema.append(avro_schema_item)
return sort_messages_by_dependencies(avro_schema)
else:
if not isinstance(json_schema, list):
json_schema = [json_schema]

for schema in json_schema:
if schema['type'] == 'object':
avro_schema.append(json_schema_object_to_avro_record(None, schema, namespace))
avro_type = json_schema_object_to_avro_record(None, schema, namespace, json_schema, base_uri, avro_schema)
existing_type = next((t for t in avro_schema if t.get('name') == avro_type['name'] and t.get('namespace') == avro_type.get('namespace') ), None)
if not existing_type:
avro_schema.append(avro_type)
return sort_messages_by_dependencies(avro_schema)

def convert_jsons_to_avro(json_schema_file_path: str, avro_schema_path: str, namespace: str = None) -> list:
"""Convert JSON schema file to Avro schema file."""
with open(json_schema_file_path, 'r') as schema_file:
json_schema = json.load(schema_file)
avro_schema = jsons_to_avro(json_schema, namespace)
# turn the file path into a file URI if it's not a URI already
parsed_url = urlparse(json_schema_file_path)
if not parsed_url.hostname and not parsed_url.scheme == "file":
json_schema_file_path = 'file://' + json_schema_file_path
parsed_url = urlparse(json_schema_file_path)
content = fetch_content(parsed_url.geturl())
json_schema = json.loads(content)
# drop the file name from the parsed URL to get the base URI
avro_schema = jsons_to_avro(json_schema, namespace, parsed_url.geturl())
if len(avro_schema) == 1:
avro_schema = avro_schema[0]
with open(avro_schema_path, 'w') as avro_file:
json.dump(avro_schema, avro_file, indent=4)
return avro_schema
2 changes: 1 addition & 1 deletion avrotize/prototoavro.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def convert_proto_to_avro_schema(proto_file_path: str) -> list:
imported_types[t["namespace"]+"."+t["name"]] = t
else:
# find the path relative to the current directory
cwd = os.getcwd()
cwd = os.path.join(os.getcwd(),os.path.dirname(proto_file_path))
import_path = os.path.join(cwd, import_)
# raise an exception if the imported file does not exist
if not os.path.exists(import_path):
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ dependencies = [
"jsonschema>=4.17.3",
"lark>=1.1.9",
"pyarrow>=15.0.0",
"asn1tools>=0.166.0"
"asn1tools>=0.166.0",
"jsonpointer>=2.4"
]

[project.scripts]
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ requests==2.31.0
tomli_w==1.0.0
urllib3==2.2.0
pyarrow==15.0.0
asn1tools==0.166.0
asn1tools==0.166.0
jsonpointer==2.4
9 changes: 9 additions & 0 deletions test/jsons/addlprops1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Only Additional Properties",
"type": "object",
"additionalProperties": {
"type": "string"
}
}

Loading

0 comments on commit 57f0776

Please sign in to comment.