From 42428eb37d3e131a670adabc5588cabb980bc9ba Mon Sep 17 00:00:00 2001 From: voidZXL Date: Sat, 19 Oct 2024 13:07:58 +0800 Subject: [PATCH] adding json-schema generator and python code generator, support typing.Self --- tests/test_cls.py | 29 +- tests/test_func.py | 25 +- tests/test_type.py | 1 + utype/__init__.py | 2 +- utype/parser/base.py | 7 +- utype/parser/cls.py | 2 + utype/parser/field.py | 8 +- utype/parser/func.py | 23 +- utype/parser/options.py | 4 + utype/parser/rule.py | 210 ++++++++++-- utype/specs/json_schema/__init__.py | 2 + utype/specs/json_schema/constant.py | 118 +++++++ .../generator.py} | 104 +----- utype/specs/json_schema/parser.py | 298 ++++++++++++++++++ utype/specs/python/__init__.py | 1 + utype/specs/python/generator.py | 168 ++++++++++ utype/types.py | 2 +- utype/utils/base.py | 208 +++++++++++- utype/utils/compat.py | 12 + utype/utils/datastructures.py | 44 +++ utype/utils/functional.py | 32 ++ utype/utils/transform.py | 22 +- 22 files changed, 1173 insertions(+), 149 deletions(-) create mode 100644 utype/specs/json_schema/__init__.py create mode 100644 utype/specs/json_schema/constant.py rename utype/specs/{json_schema.py => json_schema/generator.py} (81%) create mode 100644 utype/specs/json_schema/parser.py create mode 100644 utype/specs/python/__init__.py create mode 100644 utype/specs/python/generator.py diff --git a/tests/test_cls.py b/tests/test_cls.py index 1593911..25e86a9 100644 --- a/tests/test_cls.py +++ b/tests/test_cls.py @@ -10,7 +10,7 @@ import utype from utype import (DataClass, Field, Options, Rule, Schema, exc, register_transformer, types) -from utype.utils.compat import Final +from utype.utils.compat import Final, Self @pytest.fixture(params=(False, True)) @@ -319,19 +319,28 @@ class T(Schema): T(forward_in_dict={1: [2], 2: [1]}) # test not-module-level self ref - class Self(Schema): + class SelfRef(Schema): name: str - to_self: "Self" = Field(required=False) - self_lst: List["Self"] = Field(default_factory=list) + to_self: "SelfRef" = Field(required=False) + self_lst: List["SelfRef"] = Field(default_factory=list) - sf = Self(name=1, to_self=b'{"name":"test"}') + sf = SelfRef(name=1, to_self=b'{"name":"test"}') assert sf.to_self.name == "test" assert sf.self_lst == [] - sf2 = Self(name="t2", self_lst=[dict(sf)]) + sf2 = SelfRef(name="t2", self_lst=[dict(sf)]) assert sf2.self_lst[0].name == "1" assert "to_self" not in sf2 + class SelfRef2(Schema): + name: str + to_self: Self = Field(required=False) + self_lst: List[Self] = Field(default_factory=list) + + sfi = SelfRef2(name=1, to_self=b'{"name":"test"}') + assert sfi.to_self.name == "test" + assert sfi.self_lst == [] + # class ForwardSchema(Schema): # int1: 'types.PositiveInt' = Field(lt=10) # int2: 'types.PositiveInt' = Field(lt=20) @@ -340,11 +349,11 @@ class Self(Schema): def test_local_forward_ref(self): def f(u=0): - class Self(Schema): + class LocSelf(Schema): num: int = u - to_self: Optional["Self"] = None - list_self: List["Self"] = utype.Field(default_factory=list) - data = Self(to_self={'to_self': {}}, list_self=[{'list_self': []}]) + to_self: Optional["LocSelf"] = None + list_self: List["LocSelf"] = utype.Field(default_factory=list) + data = LocSelf(to_self={'to_self': {}}, list_self=[{'list_self': []}]) return data.to_self.to_self.num, data.list_self[0].num assert f(1) == (1, 1) diff --git a/tests/test_func.py b/tests/test_func.py index 8ceb557..34be0cf 100644 --- a/tests/test_func.py +++ b/tests/test_func.py @@ -8,7 +8,7 @@ import utype from utype import Field, Options, Param, exc, parse, types -from utype.utils.compat import Final +from utype.utils.compat import Final, Self @pytest.fixture(params=(False, True)) @@ -21,6 +21,22 @@ def on_error(request): return request.param +class schemas: + class MySchema(utype.Schema): + a: int + b: int + result: int + + @classmethod + @utype.parse + def add(cls, a: int, b: int) -> Self: + return dict( + a=a, + b=b, + result=a+b + ) + + class TestFunc: def test_basic(self): import utype @@ -406,6 +422,13 @@ def fib(n: int = utype.Param(ge=0), _current: int = 0, _next: int = 1): assert fib('10', _current=10, _next=6) == 55 assert fib('10', 10, 5) == 615 # can pass through positional args + def test_self_ref(self): + result = schemas.MySchema.add('1', '2') + assert isinstance(result, schemas.MySchema) + assert result.a == 1 + assert result.b == 2 + assert result.result == 3 + def test_args_parse(self): @utype.parse def get(a): diff --git a/tests/test_type.py b/tests/test_type.py index c4b9bae..a488c43 100644 --- a/tests/test_type.py +++ b/tests/test_type.py @@ -313,6 +313,7 @@ def trans_my(trans, d, t): ], date: [ ("2020-02-20", date(2020, 2, 20), True, True), + ("20200220", date(2020, 2, 20), True, True), ("2020/02/20", date(2020, 2, 20), True, True), ("2020/2/20", date(2020, 2, 20), True, True), ("20/02/2020", date(2020, 2, 20), True, True), diff --git a/utype/__init__.py b/utype/__init__.py index 3c4b3bf..7d9b9f0 100644 --- a/utype/__init__.py +++ b/utype/__init__.py @@ -12,7 +12,7 @@ register_transformer = TypeTransformer.registry.register -VERSION = (0, 5, 6, None) +VERSION = (0, 6, 0, 'alpha') def _get_version(): diff --git a/utype/parser/base.py b/utype/parser/base.py index 8531d68..7146224 100644 --- a/utype/parser/base.py +++ b/utype/parser/base.py @@ -87,6 +87,10 @@ def __init__(self, obj, options: Options = None): def make_context(self, context=None, force_error: bool = False): return self.options.make_context(context=context, force_error=force_error) + @property + def bound(self): + return self.obj + @property def kwargs(self): return {} @@ -109,7 +113,8 @@ def parse_annotation(self, annotation): annotation=annotation, forward_refs=self.forward_refs, global_vars=self.globals, - force_clear_refs=self.is_local + force_clear_refs=self.is_local, + bound=self.bound ) @cached_property diff --git a/utype/parser/cls.py b/utype/parser/cls.py index 6910591..1d2fae9 100644 --- a/utype/parser/cls.py +++ b/utype/parser/cls.py @@ -141,6 +141,7 @@ def generate_fields(self): forward_refs=self.forward_refs, options=self.options, force_clear_refs=self.is_local, + bound=self.bound, **self.kwargs ) except Exception as e: @@ -185,6 +186,7 @@ def generate_fields(self): forward_refs=self.forward_refs, options=self.options, force_clear_refs=self.is_local, + bound=self.bound, **self.kwargs ) except Exception as e: diff --git a/utype/parser/field.py b/utype/parser/field.py index bafde72..e6fc6d1 100644 --- a/utype/parser/field.py +++ b/utype/parser/field.py @@ -8,6 +8,7 @@ from uuid import UUID from ..utils import exceptions as exc +from ..utils.base import ParamsCollector from ..utils.compat import Literal, get_args, is_final, is_annotated, ForwardRef from ..utils.datastructures import unprovided from ..utils.functional import copy_value, get_name, multi @@ -17,7 +18,7 @@ represent = repr -class Field: +class Field(ParamsCollector): parser_field_cls = None def __init__( @@ -91,6 +92,8 @@ def __init__( min_contains: int = None, unique_items: Union[bool, ConstraintMode] = None, ): + super().__init__(locals()) + if mode: if readonly or writeonly: raise exc.ConfigError( @@ -1094,6 +1097,7 @@ def generate( positional_only: bool = False, global_vars=None, forward_refs=None, + bound=None, force_clear_refs=False, **kwargs ): @@ -1216,6 +1220,7 @@ def generate( global_vars=global_vars, forward_refs=forward_refs, forward_key=attname, + bound=bound, constraints=output_field.constraints if output_field else None, force_clear_refs=force_clear_refs ) @@ -1278,6 +1283,7 @@ def generate( global_vars=global_vars, forward_refs=forward_refs, forward_key=attname, + bound=bound, force_clear_refs=force_clear_refs ) diff --git a/utype/parser/func.py b/utype/parser/func.py index d6d7b83..afd57ab 100644 --- a/utype/parser/func.py +++ b/utype/parser/func.py @@ -33,6 +33,18 @@ def _f_pass(): class FunctionParser(BaseParser): + @property + def bound(self): + # class A: + # class B: + # def f(): + # f.__qualname__ = 'A.B.f' + # f.bound -> 'A.B' + name = self.obj.__qualname__ + if '.' in name: + return '.'.join(name.split('.')[:-1]) + return None + @classmethod def function_pass(cls, f): if not inspect.isfunction(f): @@ -299,10 +311,12 @@ def generate_return_types(self): if not self.return_annotation: return - self.return_type = self.parse_annotation(annotation=self.return_annotation) + self.return_type = self.parse_annotation( + annotation=self.return_annotation + ) # https://docs.python.org/3/library/typing.html#typing.Generator - if self.return_type and issubclass(self.return_type, Rule): + if self.return_type and isinstance(self.return_type, type) and issubclass(self.return_type, Rule): if self.is_generator: if self.return_type.__origin__ in (Iterable, Iterator): self.generator_yield_type = self.return_type.__args__[0] @@ -406,6 +420,7 @@ def generate_fields(self): forward_refs=self.forward_refs, options=self.options, positional_only=param.kind == param.POSITIONAL_ONLY, + bound=self.bound, **self.kwargs ) except Exception as e: @@ -760,6 +775,7 @@ def get_sync_generator( @wraps(self.obj) def eager_generator(*args, **kwargs) -> Generator: context = (options or self.options).make_context() + self.resolve_forward_refs() args, kwargs = self.get_params( args, kwargs, @@ -846,6 +862,7 @@ def get_async_generator( @wraps(self.obj) def eager_generator(*args, **kwargs) -> AsyncGenerator: context = (options or self.options).make_context() + self.resolve_forward_refs() args, kwargs = self.get_params( args, kwargs, @@ -886,6 +903,7 @@ def get_async_call( @wraps(self.obj) def eager_call(*args, **kwargs): context = (options or self.options).make_context() + self.resolve_forward_refs() args, kwargs = self.get_params( args, kwargs, @@ -915,6 +933,7 @@ def sync_call( parse_params: bool = None, parse_result: bool = None, ): + self.resolve_forward_refs() args, kwargs = self.get_params( args, kwargs, diff --git a/utype/parser/options.py b/utype/parser/options.py index 9915fd3..7e84f36 100644 --- a/utype/parser/options.py +++ b/utype/parser/options.py @@ -3,6 +3,7 @@ from typing import Any, Callable, List, Optional, Set, Type, Union from ..utils import exceptions as exc +# from ..utils.base import ParamsCollector from ..utils.compat import Literal from ..utils.datastructures import unprovided from ..utils.functional import multi @@ -143,6 +144,7 @@ def __init__( # if this value is another callable (like dict, list), return value() # otherwise return this value directly when attr is unprovided ): + # super().__init__({k: v for k, v in locals().items() if not unprovided(v)}) if no_data_loss: if addition is None: @@ -182,6 +184,8 @@ def __init__( for key, val in locals().items(): if unprovided(val): continue + if key.startswith('_'): + continue if hasattr(self, key): # if getattr(self, key) == val: # continue diff --git a/utype/parser/rule.py b/utype/parser/rule.py index d2edf08..cfe25a3 100644 --- a/utype/parser/rule.py +++ b/utype/parser/rule.py @@ -9,7 +9,7 @@ Mapping, Optional, Tuple, Type, TypeVar, Union, Iterator) from ..utils import exceptions as exc -from ..utils.compat import (ForwardRef, Literal, evaluate_forward_ref, +from ..utils.compat import (ForwardRef, Literal, Self, evaluate_forward_ref, get_args, get_origin, UnionType) from ..utils.datastructures import unprovided from ..utils.functional import multi, pop @@ -308,17 +308,24 @@ def _repr(_arg): return repr(_arg) return getattr(_arg, "__name__", None) or repr(_arg) - if not cls.args: - origin = getattr(cls, "__origin__", None) + origin = getattr(cls, "__origin__", None) + # if cls.__name__ == 'Rule': + # origin_repr = f'Rule[{_repr(origin)}]' if origin else cls.__name__ + # else: + origin_repr = f'{cls.__name__}[{_repr(origin)}]' if origin else cls.__name__ + + if not cls.combinator: validators = getattr(cls, "__validators__", []) constraints = ", ".join(f"{key}={val}" for key, val, c in validators) - origin_repr = "" - if origin: - origin_repr = _repr(origin) - if constraints: - origin_repr += ", " - return f"{cls.__name__}({origin_repr}{constraints})" + args = [] + if cls.args: + args.extend([_repr(arg) for arg in cls.args]) + if constraints: + args.append(constraints) + args_repr = "(%s)" % ", ".join(args) if args else '' + return f"{origin_repr}{args_repr}" + # _origin = _repr(origin) if origin else cls.__name__ args_repr = ", ".join([_repr(arg) for arg in cls.args]) l_par = "(" if cls.combinator else "[" r_par = ")" if cls.combinator else "]" @@ -1036,6 +1043,7 @@ class Rule(metaclass=LogicalType): transformer_cls = TypeTransformer constraints_cls = Constraints context_cls = RuntimeContext + __options__ = None __origin__: type = None __applied__: bool = False @@ -1246,7 +1254,11 @@ def annotate( constraints: Dict[str, Any] = None, global_vars: Dict[str, Any] = None, forward_refs=None, - force_clear_refs=False + force_clear_refs=False, + options=None, + bound=None, + name: str = None, + description: str = None, ): args = [] ellipsis_args = False @@ -1296,7 +1308,8 @@ def annotate( arg, global_vars=global_vars, forward_refs=forward_refs, - force_clear_refs=force_clear_refs + force_clear_refs=force_clear_refs, + bound=bound ) # this annotation can be a ForwardRef # not with constraints, cause that is applied to upper layer @@ -1307,7 +1320,6 @@ def annotate( # if not args and not constraints: # return type_ - name = cls.__name__ if type_ == Union or type_ == UnionType: # in Python >= 3.10, native logic operator like int | str will be a UnionType type_ = LogicalType.any_of(*args) @@ -1324,10 +1336,12 @@ def annotate( # forward_refs=forward_refs, # global_vars=global_vars # ) - if issubclass(type_, cls): + if issubclass(type_, cls) and not name: # use the subclass name name = type_.__name__ + name = name or cls.__name__ + # if type_ == Union: # any_of = LogicalType.any_of(*args) # if not constraints: @@ -1341,6 +1355,10 @@ def annotate( attrs.update(__args__=args) if ellipsis_args: attrs.update(__ellipsis_args__=True) + if options is not None: + attrs.update(__options__=options) + if description: + attrs.update(__doc__=description) if constraints: attrs.update(constraints) return LogicalType(name, (cls,), attrs) @@ -1350,6 +1368,7 @@ def parse_annotation( cls, annotation, constraints=None, + bound=None, global_vars=None, forward_refs=None, forward_key=None, @@ -1363,6 +1382,18 @@ def parse_annotation( # ForwardRef annotation = ForwardRef(annotation) + if annotation is Self: + if not bound: + raise TypeError('using Self annotation must inside a class bound') + if isinstance(bound, str): + annotation = ForwardRef(bound) + elif isinstance(bound, ForwardRef): + annotation = bound + elif isinstance(bound, type): + return bound + else: + raise TypeError(f'using Self annotation got invalid class bound: {bound}') + if isinstance(annotation, ForwardRef): annotation = register_forward_ref( annotation=annotation, @@ -1407,7 +1438,8 @@ def parse_annotation( constraints=constraints, forward_refs=forward_refs, global_vars=global_vars, - force_clear_refs=force_clear_refs + force_clear_refs=force_clear_refs, + bound=bound ) elif annotation: if isinstance(annotation, type): @@ -1417,7 +1449,8 @@ def parse_annotation( constraints=constraints, forward_refs=forward_refs, global_vars=global_vars, - force_clear_refs=force_clear_refs + force_clear_refs=force_clear_refs, + bound=bound ) else: # no constraints, we can directly use it @@ -1430,7 +1463,8 @@ def parse_annotation( constraints=constraints, forward_refs=forward_refs, global_vars=global_vars, - force_clear_refs=force_clear_refs + force_clear_refs=force_clear_refs, + bound=bound ) return None @@ -1439,46 +1473,138 @@ def check_type(cls, t): return True @classmethod - def merge_type(cls, t): + def _get_origin(cls, t): + rule = t + while (isinstance(rule, type) and issubclass(rule, Rule) and + not rule.__args__ and not rule.__validators__): + rule = rule.__origin__ + return rule + + @classmethod + def merge_type(cls, t, strict: bool = False): + rule = cls._get_origin(cls) + t = cls._get_origin(t) + if not t or not isinstance(t, type): return cls if not cls.__origin__: return t - if cls.combinator: - return t & cls - if cls.__origin__ and issubclass(cls.__origin__, t): - return cls + + # if cls.combinator or isinstance(t, LogicalType) and t.combinator: + # return t & cls + # if cls.__origin__ and issubclass(cls.__origin__, t): + # return cls + constraints = {} for name, val, func in cls.__validators__: constraints[name] = getattr(cls, name, val) # do not lose the mode info # so we get value from cls first - # try to find a strong constraint if 'const' in constraints or 'enum' in constraints: return cls + origin = rule.__origin__ if (isinstance(rule, type) and issubclass(rule, Rule)) else rule + args = (rule.__args__ or []) if (isinstance(rule, type) and issubclass(rule, Rule)) else [] + + if isinstance(origin, type): + if isinstance(t, type): + if issubclass(origin, t): + return rule + # elif issubclass(t, origin): + else: + if not issubclass(t, origin): + if issubclass(t, Rule): + if not issubclass(t.__origin__, origin): + if strict: + raise TypeError(f'Invalid type merge: {t.__origin__}, {origin}') + elif strict: + raise TypeError(f'Invalid type merge: {t}, {origin}') + origin = t + + if isinstance(rule, LogicalType) and rule.combinator: + if rule.combinator == '|': + # Union + # narrow the condition + if isinstance(t, LogicalType) and t.combinator == '|': + # find common args + common_args = [] + for arg in rule.args: + if arg in t.args: + common_args.append(arg) + else: + arg_rule = arg if isinstance(arg, Rule) else Rule.annotate(arg) + for t_arg in t.args: + try: + common_args.append(arg_rule.merge_type(t_arg, strict=True)) + break + except (TypeError, ValueError): + continue + if len(common_args) > 1: + return LogicalType.any_of(*common_args) + elif len(common_args) == 1: + if common_args[0] != type(None): + return common_args[0] + return t + + for arg in rule.args: + if arg == t: + return arg + try: + arg_rule = arg if isinstance(arg, Rule) else Rule.annotate(arg) + return arg_rule.merge_type(t, strict=True) + except (TypeError, ValueError): + continue + else: + return rule & t + if isinstance(t, LogicalType): if t.combinator: - return t & cls + if t.combinator == '|': + common_args = [] + _rule = rule if isinstance(rule, Rule) else Rule.annotate(rule) + + for arg in t.args: + if arg == type(None): + common_args.append(arg) + continue + if arg == rule: + common_args.append(arg) + continue + if isinstance(origin, type) and issubclass(origin, arg): + common_args.append(rule) + continue + try: + common_args.append(_rule.merge_type(arg, strict=True)) + except (TypeError, ValueError): + continue + + if len(common_args) > 1: + return LogicalType.any_of(*common_args) + elif len(common_args) == 1: + if common_args[0] != type(None): + return common_args[0] + return t + else: + return t & rule + elif issubclass(t, Rule): for name, val, func in t.__validators__: constraints[name] = getattr(t, name, val) if 'const' in constraints or 'enum' in constraints: return t return Rule.annotate( - t.__origin__ or cls.__origin__, - *(t.__args__ or cls.__args__ or []), + t.__origin__ or origin, + *(t.__args__ or args), constraints=constraints ) elif issubclass(t, Enum): # do not need to apply constraint to a strong type return t - args = cls.__args__ or [] if not args and not constraints: return t return Rule.annotate( - t, + origin, *args, constraints=constraints ) @@ -1486,7 +1612,7 @@ def merge_type(cls, t): @classmethod def parse(cls, value, context: RuntimeContext = None): # use __options__ instead of options is to identify much clearer with other subclass init kwargs - context = context or cls.context_cls() + context = context or cls.context_cls(options=cls.__options__) options = context.options # IMPORTANT: # we must do clone here (as the parser do make_runtime) @@ -1697,9 +1823,10 @@ def _parse_tuple_args(cls, value: tuple, context: RuntimeContext): result = [] options = context.options - if options.no_data_loss and len(value) > len(cls.__args__): - for item in range(len(cls.__args__), len(value)): - context.handle_error(exc.TupleExceedError(item=item, value=value[item])) + if len(value) > len(cls.__args__): + if options.addition is False or options.no_data_loss: + for item in range(len(cls.__args__), len(value)): + context.handle_error(exc.TupleExceedError(item=item, value=value[item])) for i, (arg, func) in enumerate(zip(cls.__args__, cls.__arg_transformers__)): if i >= len(value): @@ -1724,6 +1851,27 @@ def _parse_tuple_args(cls, value: tuple, context: RuntimeContext): continue context.handle_error(error) + if options.addition: + if isinstance(options.addition, type): + for _i, addition_param in enumerate(value[len(cls.__args__):]): + i = _i + len(cls.__args__) + with context.enter(route=i) as arg_context: + try: + result.append( + arg_context.transformer.apply(value[i], options.addition) + ) + except Exception as e: + error = exc.ParseError( + item=i, value=value[i], type=options.addition, origin_exc=e + ) + if options.invalid_items == options.PRESERVE: + context.collect_waring(error.formatted_message) + result.append(value[i]) + continue + context.handle_error(error) + else: + result.extend(value[len(cls.__args__):]) + return cls.__origin__(result) @classmethod diff --git a/utype/specs/json_schema/__init__.py b/utype/specs/json_schema/__init__.py new file mode 100644 index 0000000..1648b95 --- /dev/null +++ b/utype/specs/json_schema/__init__.py @@ -0,0 +1,2 @@ +from .parser import JsonSchemaParser, JsonSchemaGroupParser +from .generator import JsonSchemaGenerator diff --git a/utype/specs/json_schema/constant.py b/utype/specs/json_schema/constant.py new file mode 100644 index 0000000..8381506 --- /dev/null +++ b/utype/specs/json_schema/constant.py @@ -0,0 +1,118 @@ +from decimal import Decimal +from datetime import datetime, date, time, timedelta +from uuid import UUID +from ipaddress import IPv4Address, IPv6Address +from utype.parser.rule import SEQ_TYPES, MAP_TYPES + +PRIMITIVES = ("null", "boolean", "object", "array", "integer", "number", "string") +PRIMITIVE_MAP = { + type(None): "null", + bool: "boolean", + MAP_TYPES: "object", + SEQ_TYPES: "array", + int: "integer", + (float, Decimal): "number", +} +TYPE_MAP = { + 'null': type(None), + 'boolean': bool, + 'bool': bool, + 'object': dict, + 'array': list, + 'integer': int, + 'int': int, + 'number': float, + 'float': float, + 'decimal': Decimal, + 'binary': bytes, + 'ipv4': IPv4Address, + 'ipv6': IPv6Address, + 'date-time': datetime, + 'date': date, + 'time': time, + 'duration': timedelta, + 'uuid': UUID, +} +OPERATOR_NAMES = { + "&": "allOf", + "|": "anyOf", + "^": "oneOf", + "~": "not", +} +FORMAT_MAP = { + (bytes, bytearray, memoryview): 'binary', + float: 'float', + IPv4Address: 'ipv4', + IPv6Address: 'ipv6', + datetime: 'date-time', + date: 'date', + time: 'time', + timedelta: 'duration', + UUID: 'uuid' +} +DEFAULT_CONSTRAINTS_MAP = { + 'enum': 'enum', + 'const': 'const', +} +CONSTRAINTS_MAP = { + 'multipleOf': 'multiple_of', + 'maximum': 'le', + 'minimum': 'ge', + 'exclusiveMaximum': 'lt', + 'exclusiveMinimum': 'gt', + 'decimalPlaces': 'decimal_places', + 'maxDigits': 'max_digits', + 'enum': 'enum', + 'const': 'const', + 'maxItems': 'max_length', + 'minItems': 'min_length', + 'uniqueItems': 'unique_items', + 'maxContains': 'max_contains', + 'minContains': 'min_contains', + 'contains': 'contains', + 'maxProperties': 'max_length', + 'minProperties': 'min_length', + 'minLength': 'min_length', + 'maxLength': 'max_length', + 'pattern': 'regex', +} + +TYPE_CONSTRAINTS_MAP = { + ("integer", "number"): { + 'multiple_of': 'multipleOf', + 'le': 'maximum', + 'lt': 'exclusiveMaximum', + 'ge': 'minimum', + 'gt': 'exclusiveMinimum', + 'decimal_places': 'decimalPlaces', + 'max_digits': 'maxDigits', + **DEFAULT_CONSTRAINTS_MAP, + }, + ("array",): { + 'max_length': 'maxItems', + 'min_length': 'minItems', + 'unique_items': 'uniqueItems', + 'max_contains': 'maxContains', + 'min_contains': 'minContains', + 'contains': 'contains', + **DEFAULT_CONSTRAINTS_MAP, + }, + ("object",): { + 'max_length': 'maxProperties', + 'min_length': 'minProperties', + **DEFAULT_CONSTRAINTS_MAP, + }, + ("string",): { + 'regex': 'pattern', + 'max_length': 'maxLength', + 'min_length': 'minLength', + **DEFAULT_CONSTRAINTS_MAP, + }, + ("boolean", "null"): DEFAULT_CONSTRAINTS_MAP +} + +FORMAT_PATTERNS = { + 'integer': r'[-]?\d+', + 'number': r'[-]?\d+(\.\d+)?', + 'date': r'\d{4}-\d{2}-\d{2}', +} diff --git a/utype/specs/json_schema.py b/utype/specs/json_schema/generator.py similarity index 81% rename from utype/specs/json_schema.py rename to utype/specs/json_schema/generator.py index 753d883..952320f 100644 --- a/utype/specs/json_schema.py +++ b/utype/specs/json_schema/generator.py @@ -1,92 +1,20 @@ import inspect -import warnings - from utype.parser.rule import Rule, LogicalType, SEQ_TYPES, MAP_TYPES from utype.parser.field import ParserField from utype.parser.cls import ClassParser from utype.parser.func import FunctionParser from utype.parser.base import Options -from decimal import Decimal -from datetime import datetime, date, time, timedelta -from uuid import UUID -from ipaddress import IPv4Address, IPv6Address + from typing import Optional, Type, Union, Dict -from ..utils.datastructures import unprovided -from ..utils.compat import JSON_TYPES +from utype.utils.datastructures import unprovided +from utype.utils.compat import JSON_TYPES from enum import EnumMeta +from . import constant class JsonSchemaGenerator: # pass in a defs dict to generate re-use '$defs' - PRIMITIVES = ("null", "boolean", "object", "array", "integer", "number", "string") - PRIMITIVE_MAP = { - type(None): "null", - bool: "boolean", - MAP_TYPES: "object", - SEQ_TYPES: "array", - int: "integer", - (float, Decimal): "number", - } - OPERATOR_NAMES = { - "&": "allOf", - "|": "anyOf", - "^": "oneOf", - "~": "not", - } - FORMAT_MAP = { - (bytes, bytearray, memoryview): 'binary', - float: 'float', - IPv4Address: 'ipv4', - IPv6Address: 'ipv6', - datetime: 'date-time', - date: 'date', - time: 'time', - timedelta: 'duration', - UUID: 'uuid' - } - DEFAULT_CONSTRAINTS_MAP = { - 'enum': 'enum', - 'const': 'const', - } - TYPE_CONSTRAINTS_MAP = { - ("integer", "number"): { - 'multiple_of': 'multipleOf', - 'le': 'maximum', - 'lt': 'exclusiveMaximum', - 'ge': 'minimum', - 'gt': 'exclusiveMinimum', - 'decimal_places': 'decimalPlaces', - 'max_digits': 'maxDigits', - **DEFAULT_CONSTRAINTS_MAP, - }, - ("array",): { - 'max_length': 'maxItems', - 'min_length': 'minItems', - 'unique_items': 'uniqueItems', - 'max_contains': 'maxContains', - 'min_contains': 'minContains', - 'contains': 'contains', - **DEFAULT_CONSTRAINTS_MAP, - }, - ("object",): { - 'max_length': 'maxProperties', - 'min_length': 'minProperties', - **DEFAULT_CONSTRAINTS_MAP, - }, - ("string",): { - 'regex': 'pattern', - 'max_length': 'maxLength', - 'min_length': 'minLength', - **DEFAULT_CONSTRAINTS_MAP, - }, - ("boolean", "null"): DEFAULT_CONSTRAINTS_MAP - } - - FORMAT_PATTERNS = { - 'integer': r'[-]?\d+', - 'number': r'[-]?\d+(\.\d+)?', - 'date': r'\d{4}-\d{2}-\d{2}', - } + DEFAULT_PRIMITIVE = "string" DEFAULT_REF_PREFIX = "#/$defs/" @@ -153,7 +81,7 @@ def generate_for_type(self, t: type): return data def generate_for_logical(self, t: LogicalType): - operator_name = self.OPERATOR_NAMES.get(t.combinator) + operator_name = constant.OPERATOR_NAMES.get(t.combinator) if not operator_name: return {} conditions = [self.generate_for_type(cond) for cond in t.args] @@ -167,7 +95,7 @@ def _get_format(self, origin: type) -> Optional[str]: format = getattr(origin, 'format', None) if format and isinstance(format, str): return format - for types, f in self.FORMAT_MAP.items(): + for types, f in constant.FORMAT_MAP.items(): if issubclass(origin, types): return f return None @@ -175,7 +103,7 @@ def _get_format(self, origin: type) -> Optional[str]: def _get_primitive(self, origin: type) -> str: if not origin: return self.DEFAULT_PRIMITIVE - for types, pri in self.PRIMITIVE_MAP.items(): + for types, pri in constant.PRIMITIVE_MAP.items(): if issubclass(origin, types): return pri return self.DEFAULT_PRIMITIVE @@ -206,7 +134,7 @@ def _get_args(self, r: Type[Rule]) -> dict: if not pattern: fmt = key_arg.get('format') or key_arg.get('type') if fmt: - pattern = self.FORMAT_PATTERNS.get(fmt) + pattern = constant.FORMAT_PATTERNS.get(fmt) pattern = pattern or '.*' return {name: {pattern: val_arg}} else: @@ -235,7 +163,7 @@ def generate_for_rule(self, t: Type[Rule]): origin = t.__origin__ data = dict(self.generate_for_type(origin)) primitive = getattr(t, 'primitive', None) - if primitive in self.PRIMITIVES: + if primitive in constant.PRIMITIVES: data.update(type=primitive) else: primitive = data.get('type', self.DEFAULT_PRIMITIVE) @@ -250,8 +178,8 @@ def generate_for_rule(self, t: Type[Rule]): data.update(format=fmt) # constraints - constrains_map = self.DEFAULT_CONSTRAINTS_MAP - for types, mp in self.TYPE_CONSTRAINTS_MAP.items(): + constrains_map = constant.DEFAULT_CONSTRAINTS_MAP + for types, mp in constant.TYPE_CONSTRAINTS_MAP.items(): if primitive in types: constrains_map = mp break @@ -435,11 +363,3 @@ def generate_for_function(self, f): else: data.update(additionalParameters=addition) return data - -# REVERSE ACTION OF GENERATE: -# --- GENERATE Schema and types based on Json schema - - -class JsonSchemaParser: - def __init__(self, json_schema: dict): - pass diff --git a/utype/specs/json_schema/parser.py b/utype/specs/json_schema/parser.py new file mode 100644 index 0000000..74b2adc --- /dev/null +++ b/utype/specs/json_schema/parser.py @@ -0,0 +1,298 @@ +from typing import Dict, Union, Tuple, Any, List, Type +from utype.utils.compat import ForwardRef +from utype.parser.rule import LogicalType, Rule +from utype.parser.field import Field +from utype.schema import LogicalMeta, Schema, DataClass +from utype.parser.options import Options +from utype.utils.datastructures import unprovided +from . import constant + +_type = type + + +class JsonSchemaParser: + object_base_cls = Schema + object_meta_cls = LogicalMeta + object_options_cls = Options + field_cls = Field + default_type = str + + def __init__(self, json_schema: dict, + refs: Dict[str, type] = None, + name: str = None, + description: str = None, + # '#/components/...': SchemaClass + # names: Dict[str, type] = None, + ref_prefix: str = None, # '#/components/schemas' + def_prefix: str = None, # 'schemas' + type_map: dict = None, + ): + + if not isinstance(json_schema, dict): + raise TypeError(f'Invalid json schema: {json_schema}') + self.json_schema = json_schema + self.refs = refs + self.name = name + self.description = description + self.ref_prefix = (ref_prefix.rstrip('/') + '/') if ref_prefix else '' + self.def_prefix = (def_prefix.rstrip('.') + '.') if def_prefix else '' + _type_map = dict(constant.TYPE_MAP) + if type_map: + _type_map.update(type_map) + self.type_map = _type_map + + def get_def_name(self, ref: str) -> str: + ref_name = ref.lstrip(self.ref_prefix) + return self.def_prefix + ref_name + + # def parse_type(self, schema: dict) -> type: + # return self.__class__( + # json_schema=schema, + # refs=self.refs, + # ref_prefix=self.ref_prefix, + # def_prefix=self.def_prefix, + # ).parse(type_only=True) + + @classmethod + def get_constraints(cls, schema: dict): + constraints = {} + for key, val in schema.items(): + if key in constant.CONSTRAINTS_MAP: + constraints[constant.CONSTRAINTS_MAP[key]] = val + return constraints + + def parse_field(self, schema: dict, + field_cls: Type[Field] = None, + required: bool = None, + description: str = None, + dependencies: List[str] = None, + **kwargs, + ) -> Tuple[type, Field]: + type = self.parse_type(schema, with_constraints=False) + # annotations + default = schema.get('default', unprovided) + deprecated = schema.get('deprecated', False) + title = schema.get('title') + description = schema.get('description') or description + readonly = schema.get('readOnly') + writeonly = schema.get('writeOnly') + kwargs.update(self.get_constraints(schema)) + kwargs.update( + default=default, + deprecated=deprecated, + title=title, + description=description, + readonly=readonly, + writeonly=writeonly, + required=required, + dependencies=dependencies + ) + field_cls = field_cls or self.field_cls + return type, field_cls(**kwargs) + + def __call__(self, *args, **kwargs): + return self.parse_type( + self.json_schema, + name=self.name, + description=self.description, + with_constraints=True + ) + + def parse_type(self, schema: dict, + name: str = None, + description: str = None, + with_constraints: bool = True): + ref = schema.get('$ref') + type = schema.get('type') + any_of = schema.get('anyOf') + one_of = schema.get('oneOf') + all_of = schema.get('allOf') + not_of = schema.get('not') + const = schema.get('const', unprovided) + enum = schema.get('enum') + conditions = any_of or one_of or all_of or ([not_of] if not_of else []) + value = const if not unprovided(const) else enum[0] if enum else unprovided + + if ref: + return ForwardRef(self.get_def_name(ref)) + + constraints = {} + if with_constraints: + constraints = self.get_constraints(schema) + + t = self.default_type + if type: + if type == 'array': + return self.parse_array( + schema, + name=name, + description=description, + constraints=constraints + ) + elif type == 'object': + return self.parse_object( + schema, + name=name, + description=description, + constraints=constraints + ) + else: + format = schema.get('format') + t = None + if format: + t = self.type_map.get(format) + t = t or self.type_map.get(type) or self.default_type + + elif not unprovided(value): + t = type(value) + elif conditions: + condition_types = [self.parse_type(cond) for cond in conditions] + if any_of: + t = LogicalType.any_of(*condition_types) + elif all_of: + t = LogicalType.all_of(*condition_types) + elif one_of: + t = LogicalType.one_of(*condition_types) + elif not_of: + t = LogicalType.not_of(*condition_types) + + if constraints: + return Rule.annotate( + t, + name=name, + description=description, + constraints=constraints + ) + return t + + def parse_object(self, + schema: dict, + name: str = None, + description: str = None, + constraints: dict = None + ): + name = name or 'ObjectSchema' + properties = schema.get('properties') or {} + required = schema.get('required') or [] + additional_properties = schema.get("additionalProperties", unprovided) + min_properties = schema.get("minProperties", unprovided) + max_properties = schema.get("maxProperties", unprovided) + property_names = schema.get("propertyNames") + dependent_required = schema.get('dependentRequired') + pattern_properties = schema.get("patternProperties") # not supported now + + if not properties: + if property_names: + key_obj = {'type': 'string'} + key_obj.update(property_names) + key_type = self.parse_type(key_obj) + else: + key_type = str + constraints = dict(constraints or {}) + if min_properties: + constraints.update(min_length=min_properties) + if max_properties: + constraints.update(max_length=max_properties) + return Rule.annotate(dict, key_type, Any, constraints=constraints) + + attrs = {} + annotations = {} + options = self.object_options_cls( + max_params=max_properties, + min_params=min_properties, + addition=self.parse_type(additional_properties) if isinstance(additional_properties, dict) + else additional_properties, + ) + + for key, prop in properties.items(): + field_required = key in required if required else False + field_dependencies = dependent_required.get(key) if dependent_required else None + field_type, field = self.parse_field( + prop, + required=field_required, + dependencies=field_dependencies + ) + annotations[key] = field_type + attrs[key] = field + + attrs.update( + __annotations__=annotations, + __options__=options + ) + if description: + attrs.update(__doc__=description) + new_cls = self.object_meta_cls(name, (self.object_base_cls,), attrs) + return new_cls + + def parse_array(self, + schema: dict, + name: str = None, + description: str = None, + constraints: dict = None + ): + items = schema.get('items') + prefix_items = schema.get('prefixItems') + args = [] + origin = list + addition = None + + if prefix_items: + origin = tuple + args = [self.parse_type(item) for item in prefix_items] + + if items is False: + addition = False + elif items: + addition = self.parse_type(items, with_constraints=True) + + elif items: + items_type = self.parse_type(items, with_constraints=True) + args = [items_type] + + options = Options(addition=addition) if addition is not None else None + return Rule.annotate( + origin, *args, + name=name, + description=description, + constraints=constraints, + options=options + ) + + +class JsonSchemaGroupParser: + schema_parser_cls = JsonSchemaParser + + # '#/components/schemas/...' + def __init__(self, schemas: Dict[str, dict], + # '#/components/...': SchemaClass + # names: Dict[str, type] = None, + ref_prefix: str = None, # '#/components/schemas' + def_prefix: str = None, # 'schemas' + ): + self.schemas = schemas + self.ref_prefix = (ref_prefix.rstrip('/') + '/') if ref_prefix else '' + self.def_prefix = (def_prefix.rstrip('.') + '.') if def_prefix else '' + self.refs = {} + + def __call__(self, *args, **kwargs): + pass + + def parse(self): + for name, schema in self.schemas.items(): + cls = self.schema_parser_cls( + json_schema=schema, + name=name, + refs=self.refs, + ref_prefix=self.ref_prefix, + def_prefix=self.def_prefix + )() + ref_name = self.ref_prefix + name + self.refs[ref_name] = cls + return self.refs + +# class schemas: +# class Int: +# pass +# +# class A: +# a: 'schemas.Int' diff --git a/utype/specs/python/__init__.py b/utype/specs/python/__init__.py new file mode 100644 index 0000000..b26ea09 --- /dev/null +++ b/utype/specs/python/__init__.py @@ -0,0 +1 @@ +from .generator import PythonCodeGenerator \ No newline at end of file diff --git a/utype/specs/python/generator.py b/utype/specs/python/generator.py new file mode 100644 index 0000000..c178abd --- /dev/null +++ b/utype/specs/python/generator.py @@ -0,0 +1,168 @@ +import inspect +import keyword +import re + +from utype.parser.rule import Rule, LogicalType +from utype.parser.field import Field +from utype.parser.cls import ClassParser + +from typing import Type, Dict, ForwardRef +from utype.utils.functional import represent, valid_attr +from collections import deque + +ORIGIN_MAP: dict = { + list: 'List', + dict: 'Dict', + tuple: 'Tuple', + set: 'Set', + deque: 'Deque', + frozenset: 'FrozenSet', +} + + +class PythonCodeGenerator: + # pass in a defs dict to generate re-use '$defs' + object_base_cls = 'utype.Schema' + object_field_cls = 'utype.Field' + + def __init__(self, t, + defs: Dict[type, str] = None, + # names: Dict[str, type] = None, + # ref_prefix: str = None, + # mode: str = None, + # output: bool = False + ): + self.t = t + self.defs = defs or {} + + def __call__(self) -> str: + if inspect.isfunction(self.t): + return self.generate_for_function(self.t) + return self.generate_for_type(self.t, with_constraints=True, annotation=False) + + def generate_for_function(self, f) -> str: + pass + + def generate_for_type(self, t, with_constraints: bool = True, annotation: bool = True) -> str: + if t is None: + return 'Any' + if isinstance(t, str): + return t + if isinstance(t, ForwardRef): + return repr(t.__forward_arg__) + if not isinstance(t, type): + return 'Any' + if isinstance(t, LogicalType): + if t.combinator: + arg_list = [self.generate_for_type(arg, with_constraints=with_constraints, annotation=True) + for arg in t.args] + if not arg_list: + return 'Any' + if t.combinator == '|': + if len(t.args) == 2 and type(None) in t.args: + index = 0 if t.args[1] is type(None) else 1 + return f'Optional[{arg_list[index]}]' + return f'Union[%s]' % ', '.join(arg_list) + elif t.combinator == '~': + return '~' + arg_list[0] + return str(f' {t.combinator} ').join(arg_list) + elif issubclass(t, Rule): + return self.generate_for_rule(t, with_constraints=with_constraints, annotation=annotation) + elif isinstance(getattr(t, "__parser__", None), ClassParser) and not annotation: + return self.generate_for_dataclass(t) + return represent(t) if t else 'Any' + + def generate_for_rule(self, t: Type[Rule], with_constraints: bool = True, annotation: bool = True) -> str: + constraints = {} + if with_constraints: + for name, val, func in t.__validators__: + constraints[name] = getattr(t, name, val) + origin = t.__origin__ + args = [] + if t.__args__: + origin_str = ORIGIN_MAP.get(origin) or self.generate_for_type(origin, with_constraints=False) + args = [self.generate_for_type(arg, with_constraints=True) for arg in t.__args__] + type_str = f'{origin_str}[%s]' % (', '.join(args)) + else: + type_str = self.generate_for_type(origin, with_constraints=False) + + if annotation: + if constraints: + constraints_str = ('utype.Field(%s)' % + (', '.join([f'{k}={represent(v)}' for k, v in constraints.items()]))) + return f'Annotated[{type_str}, {constraints_str}]' + return type_str + else: + lines = [f'class {t.__name__}({type_str}, Rule):'] + if t.__doc__: + lines.append(f'\t"""{t.__doc__}"""') + if args: + lines.append('\t__args__ = [%s]' % ', '.join(args)) + if constraints: + for name, val in constraints.items(): + lines.append(f'\t{name} = {represent(val)}') + if len(lines) == 1: + lines.append('\tpass') + return '\n'.join(lines) + + @classmethod + def generate_for_field(cls, field: Field, addition: dict = None) -> str: + if not field.__spec_kwargs__ and not addition: + return '' + return field._repr(name=cls.object_field_cls, addition=addition) + + @classmethod + def get_constraints(cls, t) -> dict: + if isinstance(t, LogicalType) and issubclass(t, Rule): + constraints = cls.get_constraints(t.__origin__) + for name, val, func in t.__validators__: + constraints[name] = getattr(t, name, val) + return constraints + return {} + + @classmethod + def get_attname(cls, name: str, excludes: list = None) -> str: + if name.isidentifier(): + name = re.sub('[^A-Za-z0-9]+', '_', name) + if not name.isidentifier(): + name = 'key_' + name + elif keyword.iskeyword(name): + name = name + '_' + if excludes: + while name in excludes: + name = name + '_1' + return name + + def generate_for_dataclass(self, t, force_forward_ref: bool = False) -> str: + parser: ClassParser = getattr(t, '__parser__') + cls_name = parser.name.split('.')[-1] + name_line = f'class {cls_name}({self.object_base_cls}):' + options_line = None if parser.options.vacuum else f'\t__options__ = {repr(parser.options)}' + lines = [name_line] + if t.__doc__: + lines.append(f'\t"""{t.__doc__}"""') + if options_line: + lines.append(options_line) + attrs = [] + attr_names = [] + for name, field in parser.fields.items(): + attname = field.attname or name + type_str = self.generate_for_type(field.type, with_constraints=False, annotation=True) + if force_forward_ref and not isinstance(field.type, ForwardRef): + type_str = repr(type_str) + addition = dict(self.get_constraints(field.type)) + if not valid_attr(attname): + attname = self.get_attname(attname, excludes=attr_names) + addition.update(alias=name) + field_str = self.generate_for_field(field.field, addition=addition) or None + attr_names.append(attname) + parts = [attname] + if type_str: + parts.extend([f': {type_str}']) + if field_str: + parts.extend([f' = {field_str}']) + attrs.append('\t' + ''.join(parts)) + lines.extend(attrs) + if len(lines) == 1: + lines.append('\tpass') + return '\n'.join(lines) diff --git a/utype/types.py b/utype/types.py index e1af859..3e7e382 100644 --- a/utype/types.py +++ b/utype/types.py @@ -4,7 +4,7 @@ from uuid import UUID from typing import Union, Optional, Tuple, List, Set, Mapping, \ Dict, Type, Callable, Any, TYPE_CHECKING, Iterator, ClassVar -from .utils.compat import Literal, Annotated, Final, ForwardRef +from .utils.compat import Literal, Annotated, Final, ForwardRef, Self from .parser.rule import Lax, Rule # from typing import TypeVar diff --git a/utype/utils/base.py b/utype/utils/base.py index 19c3a01..4fc8ba9 100644 --- a/utype/utils/base.py +++ b/utype/utils/base.py @@ -1,5 +1,10 @@ import inspect -from typing import Callable, Optional +from typing import Callable, Optional, Dict, Any, TypeVar, List +from .datastructures import ImmutableDict +from .functional import represent, multi, distinct_add + +T = TypeVar('T') +SEG = '__' class TypeRegistry: @@ -100,3 +105,204 @@ def resolve(self, t: type) -> Optional[Callable]: # default to base return self.base.resolve(t) return self.default + + +class ParamsCollectorMeta(type): + def __init__(cls, name, bases: tuple, attrs: dict, **kwargs): + super().__init__(name, bases, attrs) + + __init = attrs.get('__init__') # only track current init + + cls._kwargs = kwargs + cls._pos_var = None + cls._key_var = None + cls._pos_keys = [] + cls._kw_keys = [] + cls._defaults = {} + cls._requires = set() + + if not bases: + return + + defaults = {} + requires = set() + for base in bases: + if isinstance(base, ParamsCollectorMeta): + defaults.update(base._defaults) + requires.update(base._requires) + distinct_add(cls._pos_keys, base._pos_keys) + distinct_add(cls._kw_keys, base._kw_keys) + if base._key_var: + cls._key_var = base._key_var + if base._pos_var: + cls._pos_var = base._pos_var + + if __init: + _self, *parameters = inspect.signature(__init).parameters.items() + for k, v in parameters: + v: inspect.Parameter + if k.startswith(SEG) and k.endswith(SEG): + continue + if v.default is not v.empty: + defaults[k] = v.default + if k in requires: + # if base is required but subclass not + requires.remove(k) + elif v.kind not in (v.VAR_KEYWORD, v.VAR_POSITIONAL): + requires.add(k) + + if v.kind == v.VAR_POSITIONAL: + cls._pos_var = k + elif v.kind == v.POSITIONAL_ONLY: + if k not in cls._pos_keys: + cls._pos_keys.append(k) + elif v.kind == v.VAR_KEYWORD: + cls._key_var = k + else: + if k not in cls._kw_keys: + cls._kw_keys.append(k) + + cls._defaults = ImmutableDict(defaults) + cls._requires = requires + cls._attr_names = [a for a in attrs if not a.startswith('_')] + + @property + def cls_path(cls): + return f'{cls.__module__}.{cls.__name__}' + + @property + def kw_keys(cls): + return cls._kw_keys + + @property + def pos_slice(cls) -> slice: + if cls._pos_var: + return slice(0, None) + return slice(0, len(cls._pos_keys)) + + @property + def cls_name(cls): + try: + return cls.__qualname__ + except AttributeError: + return cls.__name__ + + +class ParamsCollector(metaclass=ParamsCollectorMeta): + def __init__(self, __params__: Dict[str, Any]): + args = [] + kwargs = {} + spec = {} + + for key, val in __params__.items(): + if key.startswith(SEG) and key.endswith(SEG): + continue + if val is self: + continue + if key == self._pos_var: + args += list(val) + continue + elif key == self._key_var: + if isinstance(val, dict): + _kwargs = {k: v for k, v in val.items() if not k.startswith(SEG)} + kwargs.update(_kwargs) + spec.update(_kwargs) # also update spec + continue + elif key in self._pos_keys: + args.append(key) + elif key in self._kw_keys: + kwargs[key] = val + else: + continue + if val != self._defaults.get(key): # for key_var or pos_var the default is None + spec[key] = val + + self.__args__ = tuple(args) + self.__kwargs__ = kwargs + self.__spec_kwargs__ = ImmutableDict(spec) + self.__name__ = self._get_cls_name() + + def __hash__(self): + return hash(repr(self)) + + def __eq__(self, other: 'ParamsCollector'): + if inspect.isclass(self): + return super().__eq__(other) + if not isinstance(other, self.__class__): + return False + return self.__spec_kwargs__ == other.__spec_kwargs__ and self.__args__ == other.__args__ + + def __bool__(self): + # !! return not self.vacuum + # prevent use as bool (causing lots of recessive errors) + # let sub utils define there own way of bool + return True + + def __str__(self): + return self._repr() + + def __repr__(self): + return self._repr() + + @classmethod + def __copy(cls, data, copy_class: bool = False): + if multi(data): + return type(data)([cls.__copy(d) for d in data]) + if isinstance(data, dict): + return {key: cls.__copy(val) for key, val in data.items()} + if inspect.isclass(data) and not copy_class: + # prevent class util that carry other utils cause RecursiveError + return data + if isinstance(data, ParamsCollector): + return data.__copy__() + return data + + def __deepcopy__(self, memo): + return self.__copy__() + + def __copy__(self): + # use copied version of sub utils + # return self.__class__(*self._args, **self._kwargs) + if inspect.isclass(self): + bases = getattr(self, '__bases__', ()) + attrs = dict(self.__dict__) + # pop(attrs, Attr.LOCK) # pop __lock__ + return self.__class__(self.__name__, bases, self.__copy(attrs)) + return self.__class__(*self.__copy(self.__args__), **self.__copy(self.__spec_kwargs__)) + + def _get_cls_name(self): + if inspect.isclass(self): + cls = self + else: + cls = self.__class__ + try: + return cls.__qualname__ + except AttributeError: + return cls.__name__ + + def _repr(self, name: str = None, + includes: List[str] = None, + excludes: List[str] = None, + addition: dict = None + ): + name = name or self.__name__ + if inspect.isclass(self): + return f'<{name} class "{self.__module__}.{name}">' + attrs = [] + + for k, v in self.__spec_kwargs__.items(): + # if not isinstance(v, bool) and any([s in str(k).lower() for s in self._secret_names]) and v: + # v = SECRET + if k.startswith('_'): + continue + if includes is not None and k not in includes: + continue + if excludes is not None and k in excludes: + continue + attrs.append(k + '=' + represent(v)) # str(self.display(v))) + if addition: + for k, v in addition.items(): + if k not in self.__spec_kwargs__: + attrs.append(k + '=' + represent(v)) + s = ', '.join([represent(v) for v in self.__args__] + attrs) + return f'{name}({s})' diff --git a/utype/utils/compat.py b/utype/utils/compat.py index d01fdf3..965e891 100644 --- a/utype/utils/compat.py +++ b/utype/utils/compat.py @@ -26,12 +26,24 @@ except ImportError: from typing_extensions import Annotated +try: + from typing import Self +except ImportError: + from typing_extensions import Self + +try: + from typing import Required +except ImportError: + from typing_extensions import Required + __all__ = [ "get_origin", "get_args", 'Literal', 'Final', + 'Self', + 'Required', 'UnionType', "ForwardRef", "Annotated", diff --git a/utype/utils/datastructures.py b/utype/utils/datastructures.py index 2c803a8..3a0edd1 100644 --- a/utype/utils/datastructures.py +++ b/utype/utils/datastructures.py @@ -61,3 +61,47 @@ def __get__(self, instance, cls=None): return self res = instance.__dict__[self.name] = self.func(instance) return res + + +class ImmutableDict(dict): + def __error__(self, *args, **kwargs): + raise AttributeError("ImmutableDict can not modify value") + + __delitem__ = __error__ + __setitem__ = __error__ + + def __str__(self): + return f'{self.__class__.__name__}({super().__repr__()})' + + def __repr__(self): + return f'{self.__class__.__name__}({super().__repr__()})' + + setdefault = __error__ + pop = __error__ + popitem = __error__ + clear = __error__ + update = __error__ + + +class ImmutableList(list): + def error(self, *args, **kwargs): + raise AttributeError("ImmutableList can not modify value") + + def __str__(self): + return f'{self.__class__.__name__}({super().__repr__()})' + + def __repr__(self): + return f'{self.__class__.__name__}({super().__repr__()})' + + append = error + clear = error + extend = error + insert = error + pop = error + remove = error + reverse = error + sort = error + __iadd__ = error + __imul__ = error + __setitem__ = error + __delitem__ = error diff --git a/utype/utils/functional.py b/utype/utils/functional.py index c356908..462046c 100644 --- a/utype/utils/functional.py +++ b/utype/utils/functional.py @@ -1,4 +1,5 @@ from typing import Optional +import inspect LOCALS_NAME = "" @@ -45,6 +46,16 @@ def get_name(func) -> Optional[str]: return None +def represent(val) -> str: + if isinstance(val, type): + if val is type(None): + return 'type(None)' + return val.__name__ + if inspect.isfunction(val) or inspect.ismethod(val) or inspect.isclass(val) or inspect.isbuiltin(val): + return val.__name__ + return repr(val) + + def get_obj_name(obj) -> str: name = getattr( obj, "__qualname__", getattr(obj, "__name__", None) @@ -59,3 +70,24 @@ def is_local_var(obj): obj, "__qualname__", getattr(obj, "__name__", None) ) or '' return not name or LOCALS_NAME in name + + +def distinct_add(target: list, data): + if not data: + return target + if not isinstance(target, list): + raise TypeError(f'Invalid distinct_add target type: {type(target)}, must be lsit') + # target = list(target) + if not multi(data): + if data not in target: + target.append(data) + return target + for item in data: + if item not in target: + target.append(item) + return target + + +def valid_attr(name: str): + from keyword import iskeyword + return name.isidentifier() and not iskeyword(name) diff --git a/utype/utils/transform.py b/utype/utils/transform.py index 528bfb2..0e13124 100644 --- a/utype/utils/transform.py +++ b/utype/utils/transform.py @@ -56,6 +56,7 @@ class TypeTransformer: "%d-%m-%Y", "%A, %d %B %Y", "%a, %d %b %Y", + "%Y%m%d", ] DATETIME_FORMATS = [ DateFormat.DATETIME, @@ -512,14 +513,10 @@ def to_datetime(self, data, t: Type[datetime] = datetime, date_first: bool = Fal return t(year=data.year, month=data.month, day=data.day) # noqa data = self._attempt_from(data) - try: - num = self.to_float(data, float) - except (TypeError, ValueError): - pass - else: - while abs(num) > self.MS_WATERSHED: - num /= 1000 - return t.utcfromtimestamp(num).replace(tzinfo=timezone.utc) + if isinstance(data, (int, float, Decimal)): + while abs(data) > self.MS_WATERSHED: + data /= 1000 + return t.utcfromtimestamp(data).replace(tzinfo=timezone.utc) data = self._from_byte_like(data) is_utc = "GMT" in data or 'UTC' in data or data.endswith("Z") and "T" in data @@ -550,6 +547,15 @@ def to_datetime(self, data, t: Type[datetime] = datetime, date_first: bool = Fal except (TypeError, ValueError, re.error): continue + try: + num = self.to_float(data, float) + except (TypeError, ValueError): + pass + else: + while abs(num) > self.MS_WATERSHED: + num /= 1000 + return t.utcfromtimestamp(num).replace(tzinfo=timezone.utc) + raise TypeError('invalid datetime') @registry.register(timedelta)