|
| 1 | +#!/usr/bin/env python3 |
| 2 | +import subprocess |
| 3 | +import json |
| 4 | +from pprint import pprint |
| 5 | + |
| 6 | + |
| 7 | +def from_camel_case(name): |
| 8 | + """Convert a camelCase identifier to snake case.""" |
| 9 | + l = len(name) |
| 10 | + name += "X" |
| 11 | + res = "" |
| 12 | + start = 0 |
| 13 | + for i in range(len(name)): |
| 14 | + if i > 0 and name[i].isupper(): |
| 15 | + res += name[start:i].lower() |
| 16 | + if i != l: |
| 17 | + res += "_" |
| 18 | + start = i |
| 19 | + return res |
| 20 | + |
| 21 | + |
| 22 | +def generate_method(method): |
| 23 | + assert method["paramStructure"] == "by-position" |
| 24 | + name = method["name"] |
| 25 | + params = method["params"] |
| 26 | + args_typed = ", ".join( |
| 27 | + [ |
| 28 | + from_camel_case(param["name"]) |
| 29 | + + ": " |
| 30 | + + (decode_type(param["schema"]) or "Any") |
| 31 | + for param in params |
| 32 | + ] |
| 33 | + ) |
| 34 | + args = ", ".join([from_camel_case(param["name"]) for param in params]) |
| 35 | + result_type = decode_type(method["result"]["schema"]) |
| 36 | + print(f"def {name}({args_typed}) -> {result_type}:") |
| 37 | + if "description" in method: |
| 38 | + description = method["description"] |
| 39 | + if "\n" in description: |
| 40 | + print(f' """{method["description"].lstrip()}\n """') |
| 41 | + else: |
| 42 | + print(f' """{method["description"].lstrip()}"""') |
| 43 | + print(f" rpc_call({args})") |
| 44 | + print() |
| 45 | + |
| 46 | + |
| 47 | +def generate_openrpc_methods(openrpc_spec): |
| 48 | + for method in openrpc_spec["methods"]: |
| 49 | + generate_method(method) |
| 50 | + |
| 51 | + |
| 52 | +def decode_type(property_desc): |
| 53 | + if "anyOf" in property_desc: |
| 54 | + t = property_desc["anyOf"] |
| 55 | + assert len(t) == 2 |
| 56 | + assert t[1] == {"type": "null"} |
| 57 | + ref = t[0]["$ref"] |
| 58 | + assert ref.startswith("#/components/schemas/") |
| 59 | + return f'Optional["{ref.removeprefix("#/components/schemas/")}"]' |
| 60 | + elif "$ref" in property_desc: |
| 61 | + t = property_desc["$ref"] |
| 62 | + assert t.startswith("#/components/schemas/") |
| 63 | + t = t.removeprefix("#/components/schemas/") |
| 64 | + return f'"{t}"' |
| 65 | + elif property_desc["type"] == "null": |
| 66 | + return "None" |
| 67 | + elif "null" in property_desc["type"]: |
| 68 | + assert len(property_desc["type"]) == 2 |
| 69 | + assert property_desc["type"][1] == "null" |
| 70 | + property_desc["type"] = property_desc["type"][0] |
| 71 | + t = decode_type(property_desc) |
| 72 | + if t: |
| 73 | + return f"Optional[{t}]" |
| 74 | + elif property_desc["type"] == "boolean": |
| 75 | + return "bool" |
| 76 | + elif property_desc["type"] == "integer": |
| 77 | + return "int" |
| 78 | + elif property_desc["type"] == "number" and property_desc["format"] == "double": |
| 79 | + return "float" |
| 80 | + elif property_desc["type"] == "string": |
| 81 | + return "str" |
| 82 | + elif property_desc["type"] == "array": |
| 83 | + if isinstance(property_desc["items"], list): |
| 84 | + items_desc = ", ".join(decode_type(x) for x in property_desc["items"]) |
| 85 | + return f"Tuple[{items_desc}]" |
| 86 | + else: |
| 87 | + items_type = decode_type(property_desc["items"]) |
| 88 | + return f"list[{items_type}]" |
| 89 | + elif "additionalProperties" in property_desc: |
| 90 | + additional_properties = property_desc["additionalProperties"] |
| 91 | + return f"dict[Any, {decode_type(additional_properties)}]" |
| 92 | + return None |
| 93 | + |
| 94 | + |
| 95 | +def generate_variant(variant) -> str: |
| 96 | + """Prints generated type for enum variant. |
| 97 | +
|
| 98 | + Returns the name of the generated type. |
| 99 | + """ |
| 100 | + assert variant["type"] == "object" |
| 101 | + kind = variant["properties"]["kind"] |
| 102 | + assert kind["type"] == "string" |
| 103 | + assert len(kind["enum"]) == 1 |
| 104 | + kind_name = kind["enum"][0] |
| 105 | + kind_name = kind_name[0].upper() + kind_name[1:] |
| 106 | + |
| 107 | + print(f" @dataclass(kw_only=True)") |
| 108 | + print(f" class {kind_name}:") |
| 109 | + for property_name, property_desc in variant["properties"].items(): |
| 110 | + property_name = from_camel_case(property_name) |
| 111 | + if t := decode_type(property_desc): |
| 112 | + print(f" {property_name}: {t}") |
| 113 | + else: |
| 114 | + print("# TODO") |
| 115 | + pprint(property_name) |
| 116 | + pprint(property_desc) |
| 117 | + print() |
| 118 | + |
| 119 | + # pprint(variant) |
| 120 | + |
| 121 | + return kind_name |
| 122 | + |
| 123 | + |
| 124 | +def generate_type(type_name, schema): |
| 125 | + if "oneOf" in schema: |
| 126 | + if all(x["type"] == "string" for x in schema["oneOf"]): |
| 127 | + # Simple enumeration consisting only of various string types. |
| 128 | + print(f"class {type_name}(Enum):") |
| 129 | + for x in schema["oneOf"]: |
| 130 | + for e in x["enum"]: |
| 131 | + print(f' {from_camel_case(e).upper()} = "{e}"') |
| 132 | + else: |
| 133 | + # Union type. |
| 134 | + namespace = f"{type_name}Enum" |
| 135 | + print(f"class {namespace}:") |
| 136 | + kind_names = [f"{namespace}.{generate_variant(x)}" for x in schema["oneOf"]] |
| 137 | + |
| 138 | + print(f"{type_name}: TypeAlias = {' | '.join(kind_names)}") |
| 139 | + elif schema["type"] == "string": |
| 140 | + print(f"class {type_name}(Enum):") |
| 141 | + for e in schema["enum"]: |
| 142 | + print(f' {from_camel_case(e).upper()} = "{e}"') |
| 143 | + else: |
| 144 | + print("@dataclass(kw_only=True)") |
| 145 | + print(f"class {type_name}:") |
| 146 | + for property_name, property_desc in schema["properties"].items(): |
| 147 | + property_name = from_camel_case(property_name) |
| 148 | + if decode_type(property_desc): |
| 149 | + print(f" {property_name}: {decode_type(property_desc)}") |
| 150 | + else: |
| 151 | + print(f"# TODO {property_name}") |
| 152 | + pprint(property_desc) |
| 153 | + |
| 154 | + print() |
| 155 | + |
| 156 | + |
| 157 | +def generate_openrpc_types(openrpc_spec): |
| 158 | + for type_name, schema in openrpc_spec["components"]["schemas"].items(): |
| 159 | + generate_type(type_name, schema) |
| 160 | + |
| 161 | + |
| 162 | +def main(): |
| 163 | + openrpc_spec = json.loads( |
| 164 | + subprocess.run( |
| 165 | + ["deltachat-rpc-server", "--openrpc"], capture_output=True |
| 166 | + ).stdout |
| 167 | + ) |
| 168 | + print("from dataclasses import dataclass") |
| 169 | + print("from enum import Enum") |
| 170 | + print("from typing import TypeAlias, Union, Optional, Tuple, Any") |
| 171 | + generate_openrpc_types(openrpc_spec) |
| 172 | + generate_openrpc_methods(openrpc_spec) |
| 173 | + |
| 174 | + |
| 175 | +if __name__ == "__main__": |
| 176 | + main() |
0 commit comments