Skip to content

Commit

Permalink
Lint
Browse files Browse the repository at this point in the history
  • Loading branch information
jlubken committed Jun 2, 2020
1 parent a35f074 commit 69cc7dc
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 66 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test = pytest
max-complexity = 10
max-line-length = 79
exclude = ci,docs
ignore = C812,C815,C816,D202,D401,E203,W503
ignore = C812,C815,C816,C819,D202,D401,E203,W503


[matrix]
Expand Down
130 changes: 91 additions & 39 deletions src/dsdk/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,19 @@ def _inject_mongo_uri(mongo_uri: str) -> str:
parser.add(
"--mongo-uri",
required=True,
help=(
"Mongo URI used to connect to a Mongo database: "
"mongodb://USER:PASS@HOST1,HOST2,.../DATABASE?"
"replicaset=REPLICASET&authsource=admin "
"Url encode all parts: PASS in particular"
help=" ".join(
(
"Mongo URI used to connect to a Mongo database:",
(
"mongodb://USER:PASSWORD@HOST1,HOST2,.../DATABASE?"
"replicaset=REPLICASET&authsource=admin"
),
"Use a valid uri."
"Url encode all parts, but do not encode the entire uri.",
"No unencoded colons, ampersands, slashes,",
"question-marks, etc. in parts.",
"Specifically, check url encoding of PASSWORD.",
)
),
env_var="MONGO_URI",
type=_inject_mongo_uri,
Expand All @@ -86,6 +94,21 @@ def open_mongo(self) -> Generator:
class EvidenceMixin(Mixin):
"""Evidence Mixin."""

RESULTSET_ERROR = "".join(
(
"{",
", ".join(
(
'"key": "mongo.resultset.error"',
'"collection": "%s.%s"',
'"actual": %s',
'"expected": %s',
)
),
"}",
)
)

def __init__(self, **kwargs):
"""__init__."""
super().__init__(**kwargs)
Expand All @@ -101,21 +124,11 @@ def open_batch(
doc = batch.as_insert_doc(model) # <- model dependency
with self.open_mongo() as database:
key = insert_one(database.batches, doc)
logger.info(
f'"action": "insert", '
f'"database": "{database.name}", '
f'"collection": "{database.collection.name}"'
)
yield batch

key, doc = batch.as_update_doc()
with self.open_mongo() as database:
update_one(database.batches, key, doc)
logger.info(
f'"action": "update", '
f'"database": "{database.name}", '
f'"collection": "{database.collection.name}"'
)

def store_evidence(self, batch: Batch, *args, **kwargs) -> None:
"""Store Evidence."""
Expand All @@ -129,22 +142,23 @@ def store_evidence(self, batch: Batch, *args, **kwargs) -> None:
columns = df[[c for c in df.columns if c not in exclude]]
docs = columns.to_dict(orient="records")
with self.open_mongo() as database:
result = insert_many(database[key], docs)
assert columns.shape[0] == len(result.inserted_ids), (
'"action" "insert_many", "database": "%s", "collection": \
"%s", "message": "columns.shape[0] != \
len(results.inserted_ids)"'
% (database.name, database.collection.name)
collection = database[key]
result = insert_many(collection, docs)
actual = len(result.inserted.ids)
expected = columns.shape[0]
assert actual == expected, self.RESULTSET_ERROR % (
database.name,
collection.name,
actual,
expected,
)

# TODO: Better exception
df.drop(columns=["batch_id"], inplace=True)
logger.info(
f'"action": "insert_many", '
f'"database": "{database.name}", '
f'"collection": "{database.collection.name}", '
f'"count": {len(df.index)}'
)


OPEN = '{"key": "mongo.open", "database": "%s", "is_master": "%s" }'
CLOSE = '{"key": "mongo.close", "database": "%s"}'


@contextmanager
Expand All @@ -171,36 +185,74 @@ def open_database(
**kwargs,
) as client:
database = client.get_database()
# is_master to force lazy connection open
# force lazy connection open
is_master = client.admin.command("ismaster")
logger.debug(
'{"opened_mongo_database: {"name": "%s", "is_master": "%s"}}',
database.name,
is_master,
)
logger.info(OPEN, database.name, is_master)
try:
yield database
finally:
logger.debug(
'{"close_mongo_database: {"name": "%s"}}', database.name
)
logger.info(CLOSE, database.name)


INSERT_ONE = "".join(
(
"{",
", ".join(('"key": "mongo.insert_one"', '"collection": "%s.%s"')),
"}",
)
)


@retry(AutoReconnect)
def insert_one(collection: Collection, doc: Dict[str, Any]):
"""Insert one with retry."""
return collection.insert_one(doc)
result = collection.insert_one(doc)
logger.info(INSERT_ONE, collection.database.name, collection.name)
return result


INSERT_MANY = "".join(
(
"{",
", ".join(
(
'"key": "mongo.insert_many"',
'"collection": "%s.%s"',
'"value": %s',
)
),
"}",
)
)


@retry(AutoReconnect)
def insert_many(collection: Collection, docs: Sequence[Dict[str, Any]]):
"""Insert many with retry."""
return collection.insert_many(docs)
result = collection.insert_many(docs)
logger.info(
INSERT_MANY,
collection.database.name,
collection.name,
len(result.inserted.ids),
)
return result


UPDATE_ONE = "".join(
(
"{",
", ".join(('"key": "mongo.update_one"', '"collection": "%s.%s"')),
"}",
)
)


@retry(AutoReconnect)
def update_one(
collection: Collection, key: Dict[str, Any], doc: Dict[str, Any]
):
"""Update one with retry."""
return collection.update_one(key, doc)
result = collection.update_one(key, doc)
logger.info(UPDATE_ONE, collection.database.name, collection.name)
return result
50 changes: 29 additions & 21 deletions src/dsdk/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,20 @@ def _inject_mssql_uri(mssql_uri: str) -> str:
parser.add(
"--mssql-uri",
required=True,
help=(
"MSSQL URI used to connect to a MSSQL database: "
"mssql+pymssql://USER:PASS@HOST:PORT/DATABASE?timeout=TIMEOUT "
"Url encode all parts: USER (domain slash), PASS in particular"
help=" ".join(
(
"MSSQL URI used to connect to a MSSQL database:",
(
"mssql+pymssql://USER:PASS@HOST:PORT/DATABASE?"
"timeout=TIMEOUT"
),
"Use a valid uri."
"Url encode all parts, but do not encode the entire uri.",
"No unencoded colons, ampersands, slashes,",
"question-marks, etc. in parts.",
"Specifically, check url encoding of USER (domain slash),"
"and PASSWORD.",
)
),
env_var="MSSQL_URI",
type=_inject_mssql_uri,
Expand All @@ -82,28 +92,26 @@ class CheckTablePrivileges(Task): # pylint: disable=too-few-public-methods
select 1 as n where exists (select 1 as n from {table})
"""

KEY = "table_privilege_check"
KEY = "mssql.table_privilege_check"

ON = "".join(("{", f'"key": "{KEY}.on"', "}"))

END = "".join(("{", f'"key": "{KEY}.end"', "}"))

COLUMN_PRIVILEGE = "".join(
(
"{",
", ".join(
(f'"key": "{KEY}.column_privilege_warning"', '"value": "%s"')
),
"}",
)
("{", ", ".join((f'"key": "{KEY}.warn"', '"value": "%s"')), "}",)
)

FAILED = "".join(
("{", ", ".join((f'"key": "{KEY}.failed"', '"value": "%s"')), "}")
ERROR = "".join(
("{", ", ".join((f'"key": "{KEY}.table.error"', '"table": "%s"')), "}")
)

FAILURES = "".join(
("{", ", ".join((f'"key": "{KEY}.failures"', '"value": "%s"')), "}")
ERRORS = "".join(
(
"{",
", ".join((f'"key": "{KEY}.tables.error"', '"tables": "%s"')),
"}",
)
)

def __init__(self, tables):
Expand All @@ -118,7 +126,7 @@ def __call__(self, batch, service):
cur = con.execute(self.CONNECT)
for _ in cur.fetchall():
pass
failures = []
errors = []
for table in self.tables:
sql = self.EXTANT.format(table=table)
try:
Expand All @@ -131,8 +139,8 @@ def __call__(self, batch, service):
if number == 230:
logger.info(self.COLUMN_PRIVILEGE, table)
continue
logger.warning(self.FAILED, table)
failures.append(table)
if bool(failures):
raise RuntimeError(self.FAILURES, failures)
logger.warning(self.ERROR, table)
errors.append(table)
if bool(errors):
raise RuntimeError(self.ERRORS, errors)
logger.info(self.END)
5 changes: 0 additions & 5 deletions src/dsdk/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def __init__(

def __call__(self) -> Batch:
"""Run."""
self.check()
with self.open_batch() as batch:
for task in self.pipeline:
task(batch, self)
Expand All @@ -132,10 +131,6 @@ def __call__(self) -> Batch:
)
return batch

def check(self) -> None:
"""Check."""
# TODO add smoke test for each database mixin.

def inject_arguments( # pylint: disable=no-self-use,protected-access
self, parser: ArgumentParser
) -> None:
Expand Down

0 comments on commit 69cc7dc

Please sign in to comment.