-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
9 changed files
with
647 additions
and
225 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
.idea | ||
__pycache__/ | ||
*.pyc | ||
venv/ | ||
venv* | ||
*.egg-info | ||
dist/ | ||
build/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ classifiers = [ | |
dependencies = [ | ||
"rich", | ||
"pydal", | ||
"typer", | ||
"configuraptor >= 1.15", | ||
] | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,235 +1,129 @@ | ||
""" | ||
CLI tool to generate SQL from PyDAL code. | ||
""" | ||
|
||
import argparse | ||
import pathlib | ||
import select | ||
import string | ||
import sys | ||
import textwrap | ||
import typing | ||
from typing import IO, Optional | ||
from typing import Annotated, Optional | ||
|
||
import rich | ||
from configuraptor import TypedConfig | ||
from rich.prompt import Prompt | ||
from rich.style import Style | ||
import typer | ||
from rich import print | ||
from typer import Argument | ||
from typing_extensions import Never | ||
|
||
from .helpers import flatten | ||
from .magic import find_missing_variables, generate_magic_code | ||
from .types import DATABASE_ALIASES | ||
from .__about__ import __version__ | ||
from .typer_support import ( | ||
DEFAULT_VERBOSITY, | ||
ApplicationState, | ||
Verbosity, | ||
create_enum_from_literal, | ||
) | ||
from .types import SUPPORTED_DATABASE_TYPES_WITH_ALIASES | ||
|
||
## type fuckery: | ||
|
||
class PrettyParser(argparse.ArgumentParser): # pragma: no cover | ||
""" | ||
Add 'rich' to the argparse output. | ||
""" | ||
|
||
def _print_message(self, message: str, file: Optional[IO[str]] = None) -> None: | ||
rich.print(message, file=file) | ||
DB_Types: typing.Any = create_enum_from_literal("DBType", SUPPORTED_DATABASE_TYPES_WITH_ALIASES) | ||
|
||
T = typing.TypeVar("T") | ||
|
||
def has_stdin_data() -> bool: # pragma: no cover | ||
""" | ||
Check if the program starts with cli data (pipe | or redirect ><). | ||
OptionalArgument = Annotated[Optional[T], Argument()] | ||
# usage: (myparam: OptionalArgument[some_type]) | ||
|
||
See Also: | ||
https://stackoverflow.com/questions/3762881/how-do-i-check-if-stdin-has-some-data | ||
""" | ||
return any( | ||
select.select( | ||
[ | ||
sys.stdin, | ||
], | ||
[], | ||
[], | ||
0.0, | ||
)[0] | ||
) | ||
|
||
|
||
def handle_cli( | ||
code: str, | ||
db_type: typing.Optional[str] = None, | ||
tables: typing.Optional[list[str] | list[list[str]]] = None, | ||
verbose: typing.Optional[bool] = False, | ||
noop: typing.Optional[bool] = False, | ||
magic: typing.Optional[bool] = False, | ||
) -> None: | ||
""" | ||
Handle user input. | ||
""" | ||
to_execute = string.Template( | ||
textwrap.dedent( | ||
""" | ||
from pydal import * | ||
from pydal.objects import * | ||
from pydal.validators import * | ||
### end typing stuff, start app: | ||
|
||
from pydal2sql import generate_sql | ||
app = typer.Typer() | ||
state = ApplicationState() | ||
|
||
db = database = DAL(None, migrate=False) | ||
|
||
tables = $tables | ||
db_type = '$db_type' | ||
def info(*args: str) -> None: | ||
""" | ||
'print' but with blue text. | ||
""" | ||
print(f"[blue]{' '.join(args)}[/blue]", file=sys.stderr) | ||
|
||
$extra | ||
|
||
$code | ||
def warn(*args: str) -> None: | ||
""" | ||
'print' but with yellow text. | ||
""" | ||
print(f"[yellow]{' '.join(args)}[/yellow]", file=sys.stderr) | ||
|
||
if not tables: | ||
tables = db._tables | ||
|
||
for table in tables: | ||
print(generate_sql(db[table], db_type=db_type)) | ||
def danger(*args: str) -> None: | ||
""" | ||
) | ||
) | ||
|
||
generated_code = to_execute.substitute( | ||
{"tables": flatten(tables or []), "db_type": db_type or "", "code": textwrap.dedent(code), "extra": ""} | ||
) | ||
if verbose or noop: | ||
rich.print(generated_code, file=sys.stderr) | ||
|
||
if not noop: | ||
try: | ||
exec(generated_code) # nosec: B102 | ||
except NameError: | ||
# something is missing! | ||
missing_vars = find_missing_variables(generated_code) | ||
if not magic: | ||
rich.print( | ||
f"Your code is missing some variables: {missing_vars}. Add these or try --magic", file=sys.stderr | ||
) | ||
else: | ||
extra_code = generate_magic_code(missing_vars) | ||
|
||
generated_code = to_execute.substitute( | ||
{ | ||
"tables": flatten(tables or []), | ||
"db_type": db_type or "", | ||
"extra": extra_code, | ||
"code": textwrap.dedent(code), | ||
} | ||
) | ||
|
||
if verbose: | ||
print(generated_code, file=sys.stderr) | ||
|
||
exec(generated_code) # nosec: B102 | ||
|
||
|
||
class CliConfig(TypedConfig): | ||
""" | ||
Configuration from pyproject.toml or cli. | ||
'print' but with red text. | ||
""" | ||
print(f"[red]{' '.join(args)}[/red]", file=sys.stderr) | ||
|
||
db_type: DATABASE_ALIASES | None | ||
verbose: bool | None | ||
noop: bool | None | ||
magic: bool | None | ||
filename: str | None = None | ||
tables: typing.Optional[list[str] | list[list[str]]] = None | ||
|
||
def __str__(self) -> str: | ||
""" | ||
Return as semi-fancy string for Debug. | ||
""" | ||
attrs = [f"\t{key}={value},\n" for key, value in self.__dict__.items()] | ||
classname = self.__class__.__name__ | ||
|
||
return f"{classname}(\n{''.join(attrs)})" | ||
|
||
def __repr__(self) -> str: | ||
""" | ||
Return as fancy string for Debug. | ||
""" | ||
attrs = [] | ||
for key, value in self.__dict__.items(): # pragma: no cover | ||
if key.startswith("_"): | ||
continue | ||
style = Style() | ||
if isinstance(value, str): | ||
style = Style(color="green", italic=True, bold=True) | ||
value = f"'{value}'" | ||
elif isinstance(value, bool) or value is None: | ||
style = Style(color="orange1") | ||
elif isinstance(value, int | float): | ||
style = Style(color="blue") | ||
attrs.append(f"\t{key}={style.render(value)},\n") | ||
|
||
classname = Style(color="medium_purple4").render(self.__class__.__name__) | ||
|
||
return f"{classname}(\n{''.join(attrs)})" | ||
|
||
|
||
def app() -> None: # pragma: no cover | ||
""" | ||
Entrypoint for the pydal2sql cli command. | ||
|
||
@app.command() | ||
def create(filename: OptionalArgument[str] = None, db_type: DB_Types = None) -> None: | ||
""" | ||
parser = PrettyParser( | ||
prog="pydal2sql", | ||
formatter_class=argparse.RawDescriptionHelpFormatter, | ||
description="""[green]CLI tool to generate SQL from PyDAL code.[/green]\n | ||
Aside from using cli arguments, you can also configure the tool in your code. | ||
You can set the following variables: | ||
db_type: str = 'sqlite' # your desired database type; | ||
tables: list[str] = [] # your desired tables to generate SQL for;""", | ||
epilog="Example: [i]cat models.py | pydal2sql sqlite[/i]", | ||
) | ||
Examples: | ||
pydal2sql create models.py | ||
cat models.py | pydal2sql | ||
pydal2sql # output from stdin | ||
""" | ||
print(filename) | ||
print(db_type.value if db_type else None) | ||
|
||
parser.add_argument("filename", nargs="?", help="Which file to load? Can also be done with stdin.") | ||
|
||
parser.add_argument( | ||
"db_type", nargs="?", help="Which database dialect to generate ([blue]postgres, sqlite, mysql[/blue])" | ||
) | ||
@app.command() | ||
def alter( | ||
filename_before: OptionalArgument[str] = None, | ||
filename_after: OptionalArgument[str] = None, | ||
db_type: DB_Types = None, | ||
) -> None: | ||
print(filename_before) | ||
print(filename_after) | ||
print(db_type.value if db_type else None) | ||
|
||
parser.add_argument("--verbose", "-v", help="Show more info", action=argparse.BooleanOptionalAction, default=False) | ||
|
||
parser.add_argument( | ||
"--noop", "-n", help="Only show code, don't run it.", action=argparse.BooleanOptionalAction, default=False | ||
) | ||
def show_config_callback() -> Never: | ||
""" | ||
--show-config requested! | ||
""" | ||
print(state) | ||
raise typer.Exit(0) | ||
|
||
parser.add_argument( | ||
"--magic", "-m", help="Perform magic to fix missing vars.", action=argparse.BooleanOptionalAction, default=False | ||
) | ||
|
||
parser.add_argument( | ||
"-t", | ||
"--tables", | ||
"--table", | ||
action="append", | ||
nargs="+", | ||
help="One or more tables to generate. By default, all tables in the file will be used.", | ||
) | ||
def version_callback() -> Never: | ||
""" | ||
--version requested! | ||
""" | ||
print(f"su6 Version: {__version__}") | ||
|
||
args = parser.parse_args() | ||
raise typer.Exit(0) | ||
|
||
config = CliConfig.load(key="tool.pydal2sql") | ||
|
||
config.fill(**args.__dict__) | ||
config.tables = args.tables or config.tables | ||
@app.callback(invoke_without_command=True) | ||
def main( | ||
ctx: typer.Context, | ||
config: str = None, | ||
verbosity: Verbosity = DEFAULT_VERBOSITY, | ||
# stops the program: | ||
show_config: bool = False, | ||
version: bool = False, | ||
) -> None: | ||
""" | ||
This callback will run before every command, setting the right global flags. | ||
db_type = args.db_type or args.filename or config.db_type | ||
Args: | ||
ctx: context to determine if a subcommand is passed, etc | ||
config: path to a different config toml file | ||
verbosity: level of detail to print out (1 - 3) | ||
load_file_mode = (filename := (args.filename or config.filename)) and filename.endswith(".py") | ||
show_config: display current configuration? | ||
version: display current version? | ||
if not (has_stdin_data() or load_file_mode): | ||
if not db_type: | ||
db_type = Prompt.ask("Which database type do you want to use?", choices=["sqlite", "postgres", "mysql"]) | ||
""" | ||
state.load_config(config_file=config, verbosity=verbosity) | ||
|
||
rich.print("Please paste your define tables code below and press ctrl-D when finished.", file=sys.stderr) | ||
if show_config: | ||
show_config_callback() | ||
elif version: | ||
version_callback() | ||
elif not ctx.invoked_subcommand: | ||
warn("Missing subcommand. Try `pydal2sql --help` for more info.") | ||
# else: just continue | ||
|
||
# else: data from stdin | ||
# py code or cli args should define settings. | ||
if load_file_mode and filename: | ||
db_type = args.db_type | ||
text = pathlib.Path(filename).read_text() | ||
else: | ||
text = sys.stdin.read() | ||
rich.print("---", file=sys.stderr) | ||
|
||
return handle_cli(text, db_type, config.tables, verbose=config.verbose, noop=config.noop, magic=config.magic) | ||
if __name__ == "__main__": | ||
app() |
Oops, something went wrong.