-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add items iterator #38
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from collections import deque | ||
from collections import namedtuple | ||
import os | ||
import typing as t | ||
|
@@ -68,6 +69,12 @@ def __init__( | |
self.help_type = help_type | ||
self.help_default = help_default | ||
|
||
self._full_name = _normalized(name) # Will be set by the EnvMeta metaclass | ||
|
||
@property | ||
def full_name(self) -> str: | ||
return f"_{self._full_name}" if self.private else self._full_name | ||
|
||
def _cast(self, _type: t.Any, raw: str, env: "Env") -> t.Any: | ||
if _type is bool: | ||
return t.cast(T, raw.lower() in env.__truthy__) | ||
|
@@ -100,9 +107,7 @@ def _cast(self, _type: t.Any, raw: str, env: "Env") -> t.Any: | |
def _retrieve(self, env: "Env", prefix: str) -> T: | ||
source = env.source | ||
|
||
full_name = prefix + _normalized(self.name) | ||
if self.private: | ||
full_name = f"_{full_name}" | ||
full_name = self.full_name | ||
raw = source.get(full_name.format(**env.dynamic)) | ||
if raw is None and self.deprecations: | ||
for name, deprecated_when, removed_when in self.deprecations: | ||
|
@@ -167,10 +172,8 @@ def __call__(self, env: "Env", prefix: str) -> T: | |
try: | ||
self.validator(value) | ||
except ValueError as e: | ||
full_name = prefix + _normalized(self.name) | ||
raise ValueError( | ||
"Invalid value for environment variable %s: %s" % (full_name, e) | ||
) | ||
msg = f"Invalid value for environment variable {self.full_name}: {e}" | ||
raise ValueError(msg) | ||
|
||
return value | ||
|
||
|
@@ -191,7 +194,22 @@ def __call__(self, env: "Env") -> T: | |
return value | ||
|
||
|
||
class Env(object): | ||
class EnvMeta(type): | ||
def __new__( | ||
cls, name: str, bases: t.Tuple[t.Type], ns: t.Dict[str, t.Any] | ||
) -> t.Any: | ||
env = t.cast("Env", super().__new__(cls, name, bases, ns)) | ||
|
||
prefix = ns.get("__prefix__") | ||
if prefix: | ||
for v in env.values(recursive=True): | ||
if isinstance(v, EnvVariable): | ||
v._full_name = f"{_normalized(prefix)}_{v._full_name}".upper() | ||
|
||
return env | ||
|
||
|
||
class Env(metaclass=EnvMeta): | ||
"""Env base class. | ||
|
||
This class is meant to be subclassed. The configuration is declared by using | ||
|
@@ -336,26 +354,42 @@ def d( | |
return DerivedVariable(type, derivation) | ||
|
||
@classmethod | ||
def keys(cls) -> t.Iterator[str]: | ||
"""Return the names of all the items.""" | ||
return ( | ||
k | ||
for k, v in cls.__dict__.items() | ||
if isinstance(v, (EnvVariable, DerivedVariable)) | ||
or isinstance(v, type) | ||
and issubclass(v, Env) | ||
) | ||
def items( | ||
cls, recursive: bool = False, include_derived: bool = False | ||
) -> t.Iterator[t.Tuple[str, t.Union[EnvVariable, DerivedVariable]]]: | ||
classes = (EnvVariable, DerivedVariable) if include_derived else (EnvVariable,) | ||
q: t.Deque[t.Tuple[t.Tuple[str], t.Type["Env"]]] = deque() | ||
path: t.Tuple[str] = tuple() # type: ignore[assignment] | ||
q.append((path, cls)) | ||
while q: | ||
path, env = q.popleft() | ||
for k, v in env.__dict__.items(): | ||
if isinstance(v, classes): | ||
yield ( | ||
".".join((*path, k)), | ||
t.cast(t.Union[EnvVariable, DerivedVariable], v), | ||
) | ||
elif isinstance(v, type) and issubclass(v, Env) and recursive: | ||
item_name = getattr(v, "__item__", k) | ||
if item_name is None: | ||
item_name = k | ||
q.append(((*path, item_name), v)) # type: ignore[arg-type] | ||
|
||
@classmethod | ||
def values(cls) -> t.Iterator[t.Union[EnvVariable, DerivedVariable, t.Type["Env"]]]: | ||
"""Return the names of all the items.""" | ||
return ( | ||
v | ||
for v in cls.__dict__.values() | ||
if isinstance(v, (EnvVariable, DerivedVariable)) | ||
or isinstance(v, type) | ||
and issubclass(v, Env) | ||
) | ||
def keys( | ||
cls, recursive: bool = False, include_derived: bool = False | ||
) -> t.Iterator[str]: | ||
"""Return the name of all the configuration items.""" | ||
for k, _ in cls.items(recursive, include_derived): | ||
yield k | ||
|
||
@classmethod | ||
def values( | ||
cls, recursive: bool = False, include_derived: bool = False | ||
) -> t.Iterator[t.Union[EnvVariable, DerivedVariable, t.Type["Env"]]]: | ||
"""Return the value of all the configuration items.""" | ||
for _, v in cls.items(recursive, include_derived): | ||
yield v | ||
|
||
@classmethod | ||
def include( | ||
|
@@ -371,14 +405,6 @@ def include( | |
operation would result in some variables being overwritten. This can | ||
be disabled by setting the ``overwrite`` argument to ``True``. | ||
""" | ||
if namespace is not None: | ||
if not overwrite and hasattr(cls, namespace): | ||
raise ValueError("Namespace already in use: {}".format(namespace)) | ||
|
||
setattr(cls, namespace, env_spec) | ||
|
||
return None | ||
|
||
# Pick only the attributes that define variables. | ||
to_include = { | ||
k: v | ||
|
@@ -387,14 +413,36 @@ def include( | |
or isinstance(v, type) | ||
and issubclass(v, Env) | ||
} | ||
|
||
if not overwrite: | ||
overlap = set(cls.__dict__.keys()) & set(to_include.keys()) | ||
if overlap: | ||
raise ValueError("Configuration clashes detected: {}".format(overlap)) | ||
|
||
own_prefix = _normalized(getattr(cls, "__prefix__", "")) | ||
|
||
if namespace is not None: | ||
if not overwrite and hasattr(cls, namespace): | ||
raise ValueError("Namespace already in use: {}".format(namespace)) | ||
|
||
if getattr(cls, namespace, None) is not env_spec: | ||
setattr(cls, namespace, env_spec) | ||
|
||
if own_prefix: | ||
for _, v in to_include.items(): | ||
if isinstance(v, EnvVariable): | ||
v._full_name = f"{own_prefix}_{v._full_name}" | ||
Comment on lines
+432
to
+433
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Code Quality Violationtoo many nesting levels (...read more)Avoid to nest too many loops together. Having too many loops make your code harder to understand. Learn More |
||
|
||
return None | ||
|
||
other_prefix = getattr(env_spec, "__prefix__", "") | ||
for k, v in to_include.items(): | ||
setattr(cls, k, v) | ||
if getattr(cls, k, None) is not v: | ||
setattr(cls, k, v) | ||
if isinstance(v, EnvVariable): | ||
if other_prefix: | ||
v._full_name = v._full_name[len(other_prefix) + 1 :] # noqa | ||
if own_prefix: | ||
v._full_name = f"{own_prefix}_{v._full_name}" | ||
Comment on lines
+444
to
+445
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Code Quality Violationtoo many nesting levels (...read more)Avoid to nest too many loops together. Having too many loops make your code harder to understand. Learn More |
||
|
||
@classmethod | ||
def help_info( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟠 Code Quality Violation
Found too many nested ifs within this condition (...read more)
Too many nested loops make the code hard to read and understand. Simplify your code by removing nesting levels and separate code in small units.