diff --git a/avrotize/common.py b/avrotize/common.py index 2325725..63c3bca 100644 --- a/avrotize/common.py +++ b/avrotize/common.py @@ -38,4 +38,46 @@ def generic_type() -> list[str | dict]: "type": "map", "values": l2 }]) - return l1 \ No newline at end of file + return l1 + +def find_schema_node(test, avro_schema, recursion_stack = []): + """Find the first schema node in the avro_schema matching the test""" + for recursion_item in recursion_stack: + if avro_schema is recursion_item: + raise ValueError('Cyclical reference detected in schema') + if len(recursion_stack) > 30: + raise ValueError('Maximum recursion depth 30 exceeded in schema') + try: + recursion_stack.append(avro_schema) + if isinstance(avro_schema, dict): + test_node = test(avro_schema) + if test_node: + return avro_schema + for k, v in avro_schema.items(): + if isinstance(v, (dict,list)): + node = find_schema_node(test, v, recursion_stack) + if node: + return node + elif isinstance(avro_schema, list): + for item in avro_schema: + node = find_schema_node(test, item, recursion_stack) + if node: + return node + return None + finally: + recursion_stack.pop() + +def set_schema_node(test, replacement, avro_schema): + """Set the first schema node in the avro_schema matching the test to the replacement""" + if isinstance(avro_schema, dict): + test_node = test(avro_schema) + if test_node: + avro_schema.clear() + avro_schema.update(replacement) + return + for k, v in avro_schema.items(): + if isinstance(v, (dict,list)): + set_schema_node(test, replacement, v) + elif isinstance(avro_schema, list): + for item in avro_schema: + set_schema_node(test, replacement, item) \ No newline at end of file diff --git a/avrotize/dependency_resolver.py b/avrotize/dependency_resolver.py index e61d733..c7f76ec 100644 --- a/avrotize/dependency_resolver.py +++ b/avrotize/dependency_resolver.py @@ -4,15 +4,112 @@ from typing import List + +def adjust_resolved_dependencies(avro_schema: List[dict] | dict): + """ + After resolving dependencies, it may still be necessary to adjust them. The + first pass of the algorithms below does inline all dependent types, but + the resulting document may still have fields defined before the types they + depend on because of the order in which the resolution happened, which necessarily + re-sorts the graph. This function will recursively adjust the resolved + dependencies until all record types have their dependency types defined before them. + """ + + class TreeWalker: + + def __init__(self): + self.found_something = True + + def swap_record_dependencies_above(self, current_node, record, avro_schema) -> str | None: + """ swap the first reference to of the record type above the record in avro_schema """ + if isinstance(current_node, dict): + if 'name' in current_node and 'namespace' in current_node and 'type' in current_node and \ + current_node['name'] == record['name'] and current_node.get('namespace','') == record.get('namespace','') and current_node['type'] == record['type']: + # we reached the record again. we stop here. + return None + for k, v in current_node.items(): + if k in ['dependencies', 'unmerged_types']: + continue + if isinstance(v, (dict,list)): + return self.swap_record_dependencies_above(v, record, avro_schema) + elif isinstance(v, str): + if k not in ['type', 'values', 'items']: + continue + qname = record.get('namespace','')+'.'+record['name'] + if v == qname: + self.found_something = True + current_node[k] = copy.deepcopy(record) + return qname + elif isinstance(current_node, list): + for item in current_node: + if isinstance(item, (dict,list)): + return self.swap_record_dependencies_above(item, record, avro_schema) + elif isinstance(item, str): + qname = record.get('namespace','')+'.'+record['name'] + if item == qname: + self.found_something = True + idx = current_node.index(item) + current_node.remove(item) + current_node.insert(idx, copy.deepcopy(record)) + return qname + return None + + def walk_schema(self, current_node, avro_schema, record_list) -> str | None: + found_record = None + if isinstance(current_node, dict): + if 'type' in current_node and (current_node['type'] == 'record' or current_node['type'] == 'enum'): + current_qname = current_node.get('namespace','')+'.'+current_node.get('name','') + if current_qname in record_list: + self.found_something = True + return current_qname + record_list.append(current_qname) + found_record = self.swap_record_dependencies_above(avro_schema, current_node, avro_schema) + for k, v in current_node.items(): + if isinstance(v, (dict,list)): + qname = self.walk_schema(v, avro_schema, record_list) + if qname: + self.found_something = True + current_node[k] = qname + elif isinstance(current_node, list): + for item in current_node: + qname = self.walk_schema(item, avro_schema, record_list) + if qname: + self.found_something = True + idx = current_node.index(item) + current_node.remove(item) + current_node.insert(idx, qname) + # dedupe the list + new_list = [] + for item in current_node: + if not item in new_list: + new_list.append(item) + current_node.clear() + current_node.extend(new_list) + return found_record + + # while we've got work to do + tree_walker = TreeWalker() + while True: + tree_walker.found_something = False + tree_walker.walk_schema(avro_schema, avro_schema, []) + if not tree_walker.found_something: + break + + + def inline_dependencies_of(avro_schema, record): - for dependency in record.get('dependencies', []): + """ to break circular dependencies, we will inline all dependent record """ + for dependency in copy.deepcopy(record.get('dependencies', [])): dependency_type = next((x for x in avro_schema if x['name'] == dependency or x.get('namespace','')+'.'+x['name'] == dependency), None) if not dependency_type: continue deps = record.get('dependencies', []) for field in record['fields']: swap_dependency_type(avro_schema, field, dependency, dependency_type, deps, [record['namespace']+'.'+record['name']]) - del record['dependencies'] + if 'dependencies' in record: + del record['dependencies'] + + adjust_resolved_dependencies(record) @@ -96,6 +193,8 @@ def sort_messages_by_dependencies(avro_schema): if not found: print('WARNING: There are circular dependencies in the schema, unable to resolve them: {}'.format([x['name'] for x in avro_schema if isinstance(x, dict) and 'dependencies' in x])) + + adjust_resolved_dependencies(sorted_messages) return sorted_messages def swap_record_dependencies(avro_schema, record, record_stack: List[str], recursion_depth: int = 0): @@ -169,11 +268,15 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend for field_type in field['type']: if field_type == dependency: if dependency_type in avro_schema: + index = field['type'].index(field_type) field['type'].remove(field_type) - field['type'].append(dependency_type) + field['type'].insert(index, dependency_type) avro_schema.remove(dependency_type) - dependencies.remove(dependency) + if dependency in dependencies: + dependencies.remove(dependency) dependencies.extend(dependency_type.get('dependencies', [])) + if 'dependencies' in dependency_type: + swap_record_dependencies(avro_schema, dependency_type, record_stack, recursion_depth + 1) for field_type in field['type']: if isinstance(field_type, dict): swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies, record_stack, recursion_depth + 1) @@ -186,11 +289,15 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend for item in field['items']: if item == dependency: if dependency_type in avro_schema: + index = field['items'].index(item) field['items'].remove(item) - field['items'].append(dependency_type) + field['items'].insert(index, dependency_type) avro_schema.remove(dependency_type) - dependencies.remove(dependency) + if dependency in dependencies: + dependencies.remove(dependency) dependencies.extend(dependency_type.get('dependencies', [])) + if 'dependencies' in dependency_type: + swap_record_dependencies(avro_schema, dependency_type, record_stack) for item in field['items']: if isinstance(item, dict): swap_dependency_type(avro_schema, item, dependency, dependency_type, dependencies, record_stack, recursion_depth + 1) @@ -198,7 +305,8 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend if dependency_type in avro_schema: field['items'] = dependency_type avro_schema.remove(dependency_type) - dependencies.remove(dependency) + if dependency in dependencies: + dependencies.remove(dependency) dependencies.extend(dependency_type.get('dependencies', [])) if 'dependencies' in dependency_type: swap_record_dependencies(avro_schema, dependency_type, record_stack) @@ -209,19 +317,24 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend for item in field['values']: if item == dependency: if dependency_type in avro_schema: + index = field['values'].index(item) field['values'].remove(item) - field['values'].append(dependency_type) + field['values'].insert(index, dependency_type) avro_schema.remove(dependency_type) - dependencies.remove(dependency) + if dependency in dependencies: + dependencies.remove(dependency) dependencies.extend(dependency_type.get('dependencies', [])) + if 'dependencies' in dependency_type: + swap_record_dependencies(avro_schema, dependency_type, record_stack) for item in field['values']: if isinstance(item, dict): swap_dependency_type(avro_schema, item, dependency, dependency_type, dependencies, record_stack, recursion_depth + 1) if field['values'] == dependency: if dependency_type in avro_schema: field['values'] = dependency_type - avro_schema.remove(dependency_type) - dependencies.remove(dependency) + avro_schema.remove(dependency_type) + if dependency in dependencies: + dependencies.remove(dependency) dependencies.extend(dependency_type.get('dependencies', [])) if 'dependencies' in dependency_type: swap_record_dependencies(avro_schema, dependency_type, record_stack) diff --git a/avrotize/jsonstoavro.py b/avrotize/jsonstoavro.py index 2712b4a..6889040 100644 --- a/avrotize/jsonstoavro.py +++ b/avrotize/jsonstoavro.py @@ -8,10 +8,13 @@ from jsonpointer import JsonPointerException import requests import copy -from avrotize.common import avro_name, generic_type +from avrotize.common import avro_name, find_schema_node, generic_type, set_schema_node from avrotize.dependency_resolver import inline_dependencies_of, sort_messages_by_dependencies from urllib.parse import ParseResult, urlparse, unquote + +primitive_types = ['null', 'string', 'int', 'long', 'float', 'double', 'boolean', 'bytes'] + class JsonToAvroConverter: """ Converts JSON schema to Avro schema. @@ -227,15 +230,27 @@ def merge_json_schemas(self, json_schemas: list[dict], intersect: bool = False) if isinstance(merged_type['type'], str): merged_type['type'] = [merged_type['type']] merged_type['type'].append(json_schema['type']) - if 'required' in json_schema and 'required' in merged_type: - merged_type['required'] = list(set(merged_type['required']).union(set(json_schema['required']))) + if 'required' in json_schema: + if 'required' in merged_type: + merged_type['required'] = list(set(merged_type['required']).union(set(json_schema['required']))) + else: + merged_type['required'] = json_schema['required'] merged_type['name'] = merged_type.get('name','') + json_schema.get('name','') - if 'properties' in json_schema and 'properties' in merged_type: - merged_type['properties'].update(json_schema['properties']) - if 'enum' in json_schema and 'enum' in merged_type: - merged_type['enum'] = list(set(merged_type['enum']).union(set(json_schema['enum']))) - if 'format' in json_schema and 'format' in merged_type: - merged_type['format'] = merged_type['format'] + json_schema['format'] + if 'properties' in json_schema: + if 'properties' in merged_type: + merged_type['properties'].update(json_schema['properties']) + else: + merged_type['properties'] = json_schema['properties'] + if 'enum' in json_schema: + if 'enum' in merged_type: + merged_type['enum'] = list(set(merged_type['enum']).union(set(json_schema['enum']))) + else: + merged_type['enum'] = json_schema['enum'] + if 'format' in json_schema: + if 'format' in merged_type: + merged_type['format'] = merged_type['format'] + json_schema['format'] + else: + merged_type['format'] = json_schema['format'] if intersect: # only keep the intersection of the required fields @@ -789,7 +804,19 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ if json_object_type or 'enum' in json_type: if json_object_type == 'array': if isinstance(json_type, dict) and 'items' in json_type: - avro_type = self.merge_avro_schemas([avro_type, {'type': 'array', 'items': self.json_type_to_avro_type(json_type['items'], record_name, field_name, namespace, dependencies, json_schema, base_uri, avro_schema, record_stack, recursion_depth + 1)}], avro_schema, avro_type.get('name', local_name) if isinstance(avro_type,dict) else local_name) + deps = [] + item_type = self.json_type_to_avro_type(json_type['items'], record_name, field_name, namespace, deps, json_schema, base_uri, avro_schema, record_stack, recursion_depth + 1) + if item_type and isinstance(item_type, dict) and 'type' in item_type and (item_type['type'] == 'record' or item_type['type'] == 'enum'): + item_type['dependencies'] = deps + self.register_type(avro_schema, item_type) + dependencies.append(item_type['namespace'] + '.' + item_type['name']) + else: + dependencies.extend(deps) + if isinstance(item_type, dict) and not 'type' in item_type: + item_type = generic_type() + if isinstance(item_type, str) and not item_type in primitive_types: + dependencies.append(item_type) + avro_type = self.merge_avro_schemas([avro_type, {'type': 'array', 'items': item_type}], avro_schema, avro_type.get('name', local_name) if isinstance(avro_type,dict) else local_name) else: avro_type = self.merge_avro_schemas([avro_type, {'type': 'array', 'items': generic_type()}], avro_schema, avro_type.get('name', local_name) if isinstance(avro_type,dict) else local_name) elif json_object_type and (json_object_type == 'object' or 'object' in json_object_type): @@ -811,7 +838,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ 'type': 'enum', 'symbols': enum, 'name': local_name, - 'namespace': namespace + '.' + record_name + 'namespace': namespace + '.' + record_name + '_types' } else: avro_type = self.json_schema_primitive_to_avro_type(json_object_type, json_type.get('format'), json_type.get('enum'), record_name, field_name, namespace, dependencies) @@ -866,8 +893,12 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp if isinstance(json_object, dict) and ('allOf' in json_object or 'oneOf' in json_object or 'anyOf' in json_object): # we will merge allOf, oneOf, anyOf into a union record type type = self.json_type_to_avro_type(json_object, name, 'value', namespace, dependencies, json_schema, base_uri, avro_schema, record_stack) - if isinstance(type, list) or isinstance(type, str): + if isinstance(type, str): + # we are skipping references and primitives + return None + if isinstance(type, list): # we should have a union type + print(f'INFO: Standalone type {name} is being wrapped in a record') type = { 'type': 'record', 'name': avro_name(name), @@ -881,6 +912,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp } elif isinstance(type, dict) and 'type' in type and type['type'] != 'record': # merge the type into a record type if it's not a record type + print(f'INFO: Standalone type {name} is being wrapped in a record') new_type = { 'type': 'record', 'name': avro_name(type.get('name',name)+'_wrapper'), @@ -918,22 +950,27 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp if isinstance(json_object, dict) and 'type' in json_object and json_object['type'] == 'array': # this is an array, which can't be standalone in Avro, so we will wraps it into a record # and include the type as an inline + print(f'WARN: Standalone array type {name} will be wrapped in a record') deps: List[str] =[] + array_type = self.json_type_to_avro_type(json_object, name, avro_name(name), namespace, deps, json_schema, base_uri, avro_schema, record_stack) avro_array = { 'type': 'record', - 'name': avro_name(name)+'_array', + 'name': avro_name(name)+'_wrapper', 'namespace': self.utility_namespace, 'fields': [ { 'name': 'items', - 'type': self.json_type_to_avro_type(json_object, name, avro_name(name), namespace, deps, json_schema, base_uri, avro_schema, record_stack) + 'type': array_type } ] } + if isinstance(array_type, dict) and 'dependencies' in array_type: + deps.extend(array_type['dependencies']) + del array_type['dependencies'] if 'description' in json_object: avro_array['doc'] = json_object['description'] if len(deps) > 0: - avro_array['dependencies'] = deps + avro_array['dependencies'] = deps return avro_array @@ -945,7 +982,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp if len(record_stack) > 0: # if we have a record stack, we need to add the current name to # the namespace since nested types are disambiguated by their namespace - namespace = namespace + '.' + record_stack[-1] + namespace = namespace + '.' + record_stack[-1] + "_types" # at this point we have a record type avro_record = { 'type': 'record', @@ -959,6 +996,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp if record_name in record_stack: # to break the cycle, we will use a containment type that references # the record that is being defined + print(f'WARN: Circular dependency found for record {record_name}. Creating {record_name}_ref.') ref_name = avro_name(record_name + '_ref') return { 'type': 'record', @@ -967,7 +1005,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp 'fields': [ { 'name': record_name, - 'type': record_name + 'type': namespace + '.' + record_name } ] } @@ -1065,20 +1103,46 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp extension_types = [] prop_docs = '' - if 'patternProperties' in json_object and isinstance(json_object['patternProperties'], dict): + if 'patternProperties' in json_object and isinstance(json_object['patternProperties'], dict) and len(json_object['patternProperties']) > 0: # pattern properties are represented as a record with field names that are the patterns pattern_props = json_object['patternProperties'] for pattern_name, props in pattern_props.items(): - prop_type = self.ensure_type(self.json_type_to_avro_type(props, record_name, pattern_name, namespace, dependencies, json_schema, base_uri, avro_schema, record_stack)) + deps = [] + prop_type = self.ensure_type(self.json_type_to_avro_type(props, record_name, pattern_name, namespace, deps, json_schema, base_uri, avro_schema, record_stack)) + if isinstance(prop_type, dict) and 'type' in prop_type and (prop_type['type'] == 'record' or prop_type['type'] == 'enum'): + if 'dependencies' in prop_type: + deps.extend(prop_type['dependencies']) + del prop_type['dependencies'] + prop_type['namespace'] = namespace + self.register_type(avro_schema, prop_type) + prop_type_ref = prop_type['namespace']+'.'+prop_type['name'] + dependencies.append(prop_type_ref) + else: + dependencies.extend(deps) + if isinstance(prop_type, str) and not prop_type in primitive_types: + dependencies.append(prop_type) if self.is_empty_type(prop_type): prop_type = generic_type() prop_docs += f"Name pattern '{pattern_name}': [{self.get_field_type_name({'type':prop_type})}]. " extension_types.append(prop_type) - if 'additionalProperties' in json_object and isinstance(json_object['additionalProperties'], dict): + if 'additionalProperties' in json_object and isinstance(json_object['additionalProperties'], dict) and len(json_object['additionalProperties']) > 0: # additional properties are represented as a map of string to the type of the value additional_props = json_object['additionalProperties'] + deps = [] values_type = self.json_type_to_avro_type(additional_props, record_name, record_name + '_extensions', namespace, dependencies, json_schema, base_uri, avro_schema, record_stack) + if isinstance(values_type, dict) and 'type' in values_type and (values_type['type'] == 'record' or values_type['type'] == 'enum'): + if 'dependencies' in values_type: + deps.extend(values_type['dependencies']) + del values_type['dependencies'] + values_type['namespace'] = namespace + self.register_type(avro_schema, values_type) + values_type_ref = values_type['namespace']+'.'+values_type['name'] + dependencies.append(values_type_ref) + else: + dependencies.extend(deps) + if isinstance(values_type, str) and not values_type in primitive_types: + dependencies.append(values_type) if self.is_empty_type(values_type): values_type = generic_type() prop_docs += f"Extra properties: [{self.get_field_type_name({'type':values_type})}]. " @@ -1100,7 +1164,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp doc += prop_docs avro_alternate_record = { 'type': 'map', - 'name': avro_record['name'] + '_map', + 'name': avro_record['name'], 'values': field_types, 'doc': doc, 'dependencies': [namespace + '.' + record_name] @@ -1122,63 +1186,21 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp return avro_record - def find_schema_node(self, test, avro_schema, recursion_stack = []): - """Find the first schema node in the avro_schema matching the test""" - for recursion_item in recursion_stack: - if avro_schema is recursion_item: - raise ValueError('Cyclical reference detected in schema') - if len(recursion_stack) > self.max_recursion_depth: - raise ValueError('Maximum recursion depth exceeded in schema') - try: - recursion_stack.append(avro_schema) - if isinstance(avro_schema, dict): - test_node = test(avro_schema) - if test_node: - return avro_schema - for k, v in avro_schema.items(): - if isinstance(v, (dict,list)): - node = self.find_schema_node(test, v, recursion_stack) - if node: - return node - elif isinstance(avro_schema, list): - for item in avro_schema: - node = self.find_schema_node(test, item, recursion_stack) - if node: - return node - return None - finally: - recursion_stack.pop() - - def set_schema_node(self, test, replacement, avro_schema): - """Set the first schema node in the avro_schema matching the test to the replacement""" - if isinstance(avro_schema, dict): - test_node = test(avro_schema) - if test_node: - avro_schema.clear() - avro_schema.update(replacement) - return - for k, v in avro_schema.items(): - if isinstance(v, (dict,list)): - self.set_schema_node(test, replacement, v) - elif isinstance(avro_schema, list): - for item in avro_schema: - self.set_schema_node(test, replacement, item) - - + def postprocess_schema(self, avro_schema: list) -> None: """ Post-process the Avro Schema for cases wheer we need a second pass """ result: dict if len(self.types_with_unmerged_types)>0: - types_with_unmerged_types = self.types_with_unmerged_types + types_with_unmerged_types = copy.deepcopy(self.types_with_unmerged_types) self.types_with_unmerged_types = [] for ref_type in types_with_unmerged_types: # find ref_type anywhere in the avro_schema graph, matching # on name and namespace. find_fn = lambda t: 'name' in t and t['name'] == ref_type['name'] and 'namespace' in t and t['namespace'] == ref_type['namespace'] - type = self.find_schema_node(find_fn, avro_schema) + type = find_schema_node(find_fn, avro_schema) if not type: - continue + raise ValueError(f"Couldn't find type {ref_type['namespace']}.{ref_type['name']} in the Avro Schema.") # resolve the unmerged types local_name = type.get('name') if not isinstance(type, dict): @@ -1220,8 +1242,8 @@ def postprocess_schema(self, avro_schema: list) -> None: # unmerged field containers have fields. type['name'] = type['name'] + '_item' type['fields'] = [{'name': 'value', 'type': merge_result }] - merge_result = type - self.set_schema_node(find_fn, merge_result, avro_schema) + merge_result = copy.deepcopy(type) + set_schema_node(find_fn, merge_result, avro_schema) def process_definition_list(self, json_schema, namespace, base_uri, avro_schema, record_stack, schema_name, json_schema_list): """Process a schema definition list.""" @@ -1236,8 +1258,9 @@ def process_definition_list(self, json_schema, namespace, base_uri, avro_schema, # it's a schema definition list self.process_definition_list(json_schema, namespace, base_uri, avro_schema, record_stack, schema_name, schema) - def process_definition(self, json_schema, namespace, base_uri, avro_schema, record_stack, schema_name, schema): + def process_definition(self, json_schema, namespace, base_uri, avro_schema, record_stack, schema_name, schema, is_root: bool = False): """ Process a schema definition. """ + avro_schema_item = None avro_schema_item_list = self.json_schema_object_to_avro_record(schema_name, schema, namespace, json_schema, base_uri, avro_schema, record_stack) if not isinstance(avro_schema_item_list, list) and not isinstance(avro_schema_item_list, dict): # skip if the record couldn't be resolved @@ -1258,6 +1281,26 @@ def process_definition(self, json_schema, namespace, base_uri, avro_schema, reco (avro_schema_item.get('type') == 'record' or avro_schema_item.get('type') == 'enum' or avro_schema_item.get('type') == 'fixed'): # we only register record/enum as type. the other defs are mix-ins self.register_type(avro_schema, avro_schema_item) + elif is_root: + # at the root, we will wrap the type in a record to make it top-level + deps = avro_schema_item.get('dependencies',[]) + if 'dependencies' in avro_schema_item: + del avro_schema_item['dependencies'] + avro_schema_wrapper = { + 'type': 'record', + 'name': schema_name, + 'namespace': avro_schema_item.get('namespace',namespace), + 'fields': [ + { + 'name': avro_schema_item['name'], + 'type': avro_schema_item + } + ] + } + if len(deps) > 0: + avro_schema_wrapper['dependencies'] = deps + avro_schema_item = avro_schema_wrapper + self.register_type(avro_schema, avro_schema_item) def id_to_avro_namespace(self, id: str) -> str: @@ -1304,7 +1347,7 @@ def jsons_to_avro(self, json_schema: dict | list, namespace: str, base_uri: str) if ref: ref_schema, json_doc = self.resolve_reference(json_schema, base_uri, json_schema) json_schema = self.merge_json_schemas([json_schema, ref_schema], intersect=False) - self.process_definition(json_schema, namespace, base_uri, avro_schema, record_stack, schema_name, json_schema) + self.process_definition(json_schema, namespace, base_uri, avro_schema, record_stack, schema_name, json_schema, is_root=True) # postprocessing pass self.postprocess_schema(avro_schema) @@ -1365,10 +1408,10 @@ def convert_jsons_to_avro(self, json_schema_file_path: str, avro_schema_path: st def convert_jsons_to_avro(json_schema_file_path: str, avro_schema_path: str, namespace: str = '', utility_namespace = '', maximize_compatibility = False) -> list | dict: """Convert JSON schema file to Avro schema file.""" - try: - converter = JsonToAvroConverter() - converter.maximize_compatibility = maximize_compatibility - return converter.convert_jsons_to_avro(json_schema_file_path, avro_schema_path, namespace, utility_namespace) - except Exception as e: - print(f'Error converting JSON {json_schema_file_path} to Avro: {e.args[0]}') - return [] \ No newline at end of file + #try: + converter = JsonToAvroConverter() + converter.maximize_compatibility = maximize_compatibility + return converter.convert_jsons_to_avro(json_schema_file_path, avro_schema_path, namespace, utility_namespace) + #except Exception as e: + # print(f'Error converting JSON {json_schema_file_path} to Avro: {e.args[0]}') + # return [] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0ab427e..cb98352 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,10 +26,13 @@ dependencies = [ "jsonpointer>=2.4", "fastavro>=1.9.4", "jsonpath-ng>=1.6.1", + "jsoncomparison>=1.1.0", ] [project.scripts] avrotize = "avrotize.avrotize:main" [tool.setuptools_scm] -write_to = "avrotize/_version.py" \ No newline at end of file +write_to = "avrotize/_version.py" +[tool.poetry.dev-dependencies] +pytest-cov = "^3.0" diff --git a/requirements.txt b/requirements.txt index 050bd7e..cb9c266 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,5 @@ pyarrow==15.0.0 asn1tools==0.166.0 jsonpointer==2.4 fastavro==1.9.4 -jsonpath-ng==1.6.1 \ No newline at end of file +jsonpath-ng==1.6.1 +jsoncomparison==1.1.0 \ No newline at end of file diff --git a/test/jsons/addlprops1-ref.avsc b/test/jsons/addlprops1-ref.avsc new file mode 100644 index 0000000..a0bc47b --- /dev/null +++ b/test/jsons/addlprops1-ref.avsc @@ -0,0 +1,16 @@ +{ + "type": "record", + "name": "document", + "namespace": "com.test.example", + "fields": [ + { + "name": "document", + "type": { + "type": "map", + "name": "document", + "values": "string", + "doc": "Extra properties: [string]. " + } + } + ] +} \ No newline at end of file diff --git a/test/jsons/addlprops2-ref.avsc b/test/jsons/addlprops2-ref.avsc new file mode 100644 index 0000000..3b20f08 --- /dev/null +++ b/test/jsons/addlprops2-ref.avsc @@ -0,0 +1,21 @@ +{ + "type": "record", + "name": "document", + "namespace": "com.test.example", + "fields": [ + { + "name": "id", + "type": [ + "null", + "int" + ] + }, + { + "name": "name", + "type": [ + "null", + "string" + ] + } + ] +} \ No newline at end of file diff --git a/test/jsons/addlprops3-ref.avsc b/test/jsons/addlprops3-ref.avsc new file mode 100644 index 0000000..6ea4a29 --- /dev/null +++ b/test/jsons/addlprops3-ref.avsc @@ -0,0 +1,169 @@ +{ + "type": "record", + "name": "document", + "namespace": "com.test.example", + "fields": [ + { + "name": "id", + "type": [ + "null", + "int" + ] + }, + { + "name": "name", + "type": [ + "null", + "string" + ] + }, + { + "name": "settings", + "type": [ + { + "type": "record", + "name": "settings", + "namespace": "com.test.example.document", + "fields": [ + { + "name": "prop1", + "type": [ + "null", + "string" + ] + }, + { + "name": "prop2", + "type": [ + "null", + "int" + ] + }, + { + "name": "prop3", + "type": [ + "null", + { + "name": "prop3", + "type": "array", + "items": "string", + "namespace": "com.test.example.document" + } + ] + }, + { + "name": "prop4", + "type": [ + "null", + { + "name": "prop4", + "type": "record", + "namespace": "com.test.example.document.settings", + "fields": [ + { + "name": "prop5", + "type": [ + "null", + "int" + ] + } + ] + } + ] + }, + { + "name": "prop6", + "type": [ + "boolean", + "int", + "null" + ] + } + ] + }, + { + "type": "map", + "name": "settings", + "values": [ + "null", + "string", + "int", + { + "name": "prop3", + "type": "array", + "items": "string", + "namespace": "com.test.example.document" + }, + "com.test.example.document.settings.prop4", + "boolean" + ], + "doc": "Mixed dynamic: 'prop1': [null, string], 'prop2': [null, int], 'prop3': [null, array], 'prop4': [null, com.test.example.document.settings.prop4], 'prop6': [boolean, int, null]. Extra properties: [string]. " + }, + "null" + ] + }, + { + "name": "other", + "type": [ + { + "type": "record", + "name": "other", + "namespace": "com.test.example.document", + "fields": [ + { + "name": "prop1", + "type": [ + "null", + "string" + ] + } + ] + }, + { + "type": "map", + "name": "other", + "values": [ + "null", + "string", + "int" + ], + "doc": "Mixed dynamic: 'prop1': [null, string]. Name pattern 'foo*': [int]. Name pattern 'bar*': [string]. " + }, + "null" + ] + }, + { + "name": "yetAnother", + "type": [ + "null", + { + "name": "yetAnother", + "type": "map", + "values": [ + "int", + "string", + "boolean" + ], + "doc": "Name pattern 'foo*': [int]. Name pattern 'bar*': [string]. Extra properties: [boolean]. ", + "namespace": "com.test.example" + } + ] + }, + { + "name": "more", + "type": [ + "null", + { + "name": "more", + "type": "map", + "values": [ + "int", + "string" + ], + "doc": "Name pattern 'foo*': [int]. Name pattern 'bar*': [string]. ", + "namespace": "com.test.example" + } + ] + } + ] +} \ No newline at end of file diff --git a/test/jsons/circularrefs-ref.avsc b/test/jsons/circularrefs-ref.avsc new file mode 100644 index 0000000..1a11d48 --- /dev/null +++ b/test/jsons/circularrefs-ref.avsc @@ -0,0 +1,127 @@ +{ + "type": "record", + "name": "document", + "namespace": "com.test.example", + "fields": [ + { + "name": "def1", + "type": [ + "null", + { + "name": "Definition1", + "type": "record", + "namespace": "com.test.example", + "fields": [ + { + "name": "prop1", + "type": [ + "null", + "com.test.example.Definition1" + ] + } + ] + } + ] + }, + { + "name": "def2", + "type": [ + "null", + { + "name": "Definition2", + "type": "record", + "namespace": "com.test.example", + "fields": [ + { + "name": "nested", + "type": [ + "null", + { + "name": "nested", + "type": "record", + "namespace": "com.test.example.Definition2", + "fields": [ + { + "name": "prop2", + "type": [ + "null", + "com.test.example.Definition2" + ] + } + ] + } + ] + } + ] + } + ] + }, + { + "name": "def3", + "type": [ + "null", + { + "name": "Definition3", + "type": "record", + "namespace": "com.test.example", + "fields": [ + { + "name": "anyOfProp", + "type": [ + "com.test.example.Definition3", + "string", + "null" + ] + } + ] + } + ] + }, + { + "name": "def4", + "type": [ + "null", + { + "name": "Definition4", + "type": "record", + "namespace": "com.test.example", + "fields": [ + { + "name": "prop4", + "type": [ + "null", + { + "name": "Definition4a", + "type": "record", + "namespace": "com.test.example", + "fields": [ + { + "name": "nested", + "type": [ + "null", + { + "name": "nested", + "type": "record", + "namespace": "com.test.example.Definition4a", + "fields": [ + { + "name": "prop5", + "type": [ + "null", + "com.test.example.Definition4" + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/test/jsons/circularrefs.json b/test/jsons/circularrefs.json index 8f373ca..f7f6660 100644 --- a/test/jsons/circularrefs.json +++ b/test/jsons/circularrefs.json @@ -12,6 +12,10 @@ "def3": { "title": "Definition3", "$ref": "#/definitions/Definition3" + }, + "def4": { + "title": "Defînition4", + "$ref": "#/definitions/Definition4" } }, "definitions": { @@ -50,6 +54,27 @@ ] } } + }, + "Definition4": { + "type": "object", + "properties": { + "prop4": { + "$ref": "#/definitions/Definition4a" + } + } + }, + "Definition4a": { + "type": "object", + "properties": { + "nested": { + "type": "object", + "properties": { + "prop5": { + "$ref": "#/definitions/Definition4" + } + } + } + } } } } \ No newline at end of file diff --git a/test/jsons/person-ref.avsc b/test/jsons/person-ref.avsc new file mode 100644 index 0000000..ad98f31 --- /dev/null +++ b/test/jsons/person-ref.avsc @@ -0,0 +1,51 @@ +{ + "type": "record", + "name": "document", + "namespace": "com.test.example", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": [ + "null", + "int" + ] + }, + { + "name": "contact", + "type": [ + { + "name": "contact_2", + "type": "record", + "namespace": "com.test.example.contact", + "fields": [ + { + "name": "phoneNumber", + "type": [ + "null", + "string" + ] + } + ] + }, + { + "name": "contact_1", + "type": "record", + "namespace": "com.test.example.contact", + "fields": [ + { + "name": "email", + "type": [ + "null", + "string" + ] + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/test/test_jsontoavro.py b/test/test_jsontoavro.py index 785711f..b488e73 100644 --- a/test/test_jsontoavro.py +++ b/test/test_jsontoavro.py @@ -1,7 +1,9 @@ +import json import os import sys from os import path, getcwd from fastavro.schema import load_schema +from jsoncomparison import NO_DIFF, Compare import pytest from avrotize.jsonstoavro import convert_jsons_to_avro @@ -18,233 +20,96 @@ class TestJsonsToAvro(unittest.TestCase): def validate_avro_schema(self, avro_file_path): load_schema(avro_file_path) - def test_convert_rootarray_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "rootarray.jsons") - avro_path = path.join(cwd, "test", "tmp", "rootarray.avsc") - dir = os.path.dirname(avro_path) + def create_avro_from_jsons(self, jsons_path, avro_path = '', avro_ref_path = '', namespace = "com.test.example"): + cwd = getcwd() + if jsons_path.startswith("http"): + jsons_full_path = jsons_path + else: + jsons_full_path = path.join(cwd, "test", "jsons", jsons_path) + if not avro_path: + avro_path = jsons_path.replace(".jsons", ".avsc").replace(".json", ".avsc") + if not avro_ref_path: + # '-ref' appended to the avsc base file name + avro_ref_full_path = path.join(cwd, "test", "jsons", avro_path.replace(".avsc", "-ref.avsc")) + else: + avro_ref_full_path = path.join(cwd, "test", "jsons", "addlprops1-ref.avsc") + avro_full_path = path.join(cwd, "test", "tmp", avro_path) + dir = os.path.dirname(avro_full_path) if not os.path.exists(dir): os.makedirs(dir) + + convert_jsons_to_avro(jsons_full_path, avro_full_path, namespace) + self.validate_avro_schema(avro_full_path) + + if os.path.exists(avro_ref_full_path): + with open(avro_ref_full_path, "r") as ref: + expected = json.loads(ref.read()) + with open(avro_full_path, "r") as ref: + actual = json.loads(ref.read()) + diff = Compare().check(actual, expected) + assert diff == NO_DIFF + - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) - def test_convert_circularrefs_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "circularrefs.json") - avro_path = path.join(cwd, "test", "tmp", "circularrefs.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) - + def test_convert_rootarray_jsons_to_avro(self): + self.create_avro_from_jsons("rootarray.jsons", "rootarray.avsc") + + def test_convert_circularrefs_jsons_to_avro(self): + self.create_avro_from_jsons("circularrefs.json", "circularrefs.avsc") + def test_convert_address_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "address.jsons") - avro_path = path.join(cwd, "test", "tmp", "address.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("address.jsons", "address.avsc") def test_convert_movie_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "movie.jsons") - avro_path = path.join(cwd, "test", "tmp", "movie.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("movie.jsons", "movie.avsc") def test_convert_person_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "person.jsons") - avro_path = path.join(cwd, "test", "tmp", "person.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("person.jsons", "person.avsc") def test_convert_employee_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "employee.jsons") - avro_path = path.join(cwd, "test", "tmp", "employee.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("employee.jsons", "employee.avsc") def test_convert_azurestorage_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "azurestorage.jsons") - avro_path = path.join(cwd, "test", "tmp", "azurestorage.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "microsoft.azure.storage") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("azurestorage.jsons", "azurestorage.avsc", namespace="microsoft.azure.storage") - #@pytest.mark.skip(reason="flaky due to remote resolution") def test_convert_azurestorage_remote_jsons_to_avro(self): - cwd = getcwd() jsons_path = "https://raw.githubusercontent.com:443/Azure/azure-rest-api-specs/master/specification/eventgrid/data-plane/Microsoft.Storage/stable/2018-01-01/Storage.json" - avro_path = path.join(cwd, "test", "tmp", "azurestorage.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "microsoft.azure.storage") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons(jsons_path, "azurestorage.avsc", namespace="microsoft.azure.storage") - #@pytest.mark.skip(reason="flaky due to remote resolution") def test_convert_azurestorage_remote_deeplink_jsons_to_avro(self): - cwd = getcwd() jsons_path = "https://raw.githubusercontent.com:443/Azure/azure-rest-api-specs/master/specification/eventgrid/data-plane/Microsoft.Storage/stable/2018-01-01/Storage.json#/definitions/StorageLifecyclePolicyCompletedEventData" - avro_path = path.join(cwd, "test", "tmp", "azurestoragedeep.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "microsoft.azure.storage") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons(jsons_path, "azurestoragedeep.avsc", namespace="microsoft.azure.storage") def test_convert_addlprops1_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "addlprops1.json") - avro_path = path.join(cwd, "test", "tmp", "addlprops1.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("addlprops1.json", "addlprops1.avsc") def test_convert_addlprops2_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "addlprops2.json") - avro_path = path.join(cwd, "test", "tmp", "addlprops2.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) - + self.create_avro_from_jsons("addlprops2.json", "addlprops2.avsc") + def test_convert_addlprops3_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "addlprops3.json") - avro_path = path.join(cwd, "test", "tmp", "addlprops3.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) - + self.create_avro_from_jsons("addlprops3.json", "addlprops3.avsc") + def test_convert_patternprops1_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "patternprops1.json") - avro_path = path.join(cwd, "test", "tmp", "patternprops1.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("patternprops1.json", "patternprops1.avsc") def test_convert_patternprops2_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "patternprops2.json") - avro_path = path.join(cwd, "test", "tmp", "patternprops2.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path, "example.com") - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("patternprops2.json", "patternprops2.avsc") def test_convert_avro_avsc_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "avro-avsc.json") - avro_path = path.join(cwd, "test", "tmp", "avro-avsc.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path) - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("avro-avsc.json", "avro-avsc.avsc") def test_convert_clouidify_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "cloudify.json") - avro_path = path.join(cwd, "test", "tmp", "cloudify.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - - convert_jsons_to_avro(jsons_path, avro_path) - #self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("cloudify.json", "cloudify.avsc") def test_convert_databricks_asset_bundles_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "databricks-asset-bundles.json") - avro_path = path.join(cwd, "test", "tmp", "databricks-asset-bundles.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - convert_jsons_to_avro(jsons_path, avro_path) - self.validate_avro_schema(avro_path) - + self.create_avro_from_jsons("databricks-asset-bundles.json", "databricks-asset-bundles.avsc") + def test_convert_jfrog_pipelines_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "jfrog-pipelines.json") - avro_path = path.join(cwd, "test", "tmp", "jfrog-pipelines.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir).makedirs(dir) - convert_jsons_to_avro(jsons_path, avro_path) - self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("jfrog-pipelines.json", "jfrog-pipelines.avsc") def test_convert_kubernetes_definitions_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "kubernetes-definitions.json") - avro_path = path.join(cwd, "test", "tmp", "kubernetes-definitions.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - convert_jsons_to_avro(jsons_path, avro_path) - #self.validate_avro_schema(avro_path) + self.create_avro_from_jsons("kubernetes-definitions.json", "kubernetes-definitions.avsc") - # exclude - @pytest.mark.skip(reason="excluded") def test_convert_travis_jsons_to_avro(self): - cwd = getcwd() - jsons_path = path.join(cwd, "test", "jsons", "travis.json") - avro_path = path.join(cwd, "test", "tmp", "travis.avsc") - dir = os.path.dirname(avro_path) - if not os.path.exists(dir): - os.makedirs(dir) - if os.path.exists(avro_path): - os.remove(avro_path) - convert_jsons_to_avro(jsons_path, avro_path) - self.validate_avro_schema(avro_path) - - - - - - - -if __name__ == '__main__': - unittest.main() \ No newline at end of file + self.create_avro_from_jsons("travis.json", "travis.avsc")