Skip to content

Commit

Permalink
Merge pull request #13 from Netflix/specify-col-names
Browse files Browse the repository at this point in the history
explicitly list desired columns in select statement
  • Loading branch information
ferras authored Jun 10, 2020
2 parents 2e25f99 + 78a930b commit 4459966
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version: "3"
services:
metadata:
image: "metadata_service:latest"
restart: always
container_name: "metadata_service"
ports:
- "${MF_METADATA_PORT:-8080}:${MF_METADATA_PORT:-8080}"
Expand Down
25 changes: 19 additions & 6 deletions metadata_service/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class AsyncPostgresTable(object):
_command = None
_insert_command = None
_filters = None
_base_query = "SELECT * from"
_base_query = "SELECT {0} from"
_row_type = None

def __init__(self):
Expand All @@ -92,16 +92,16 @@ async def get_records(self, filter_dict={}, fetch_single=False,
if bool(filter_dict):
where_clause = "where " + seperator.join(filters)

sql_template = "select * from {0} {1}"
sql_template = "select {0} from {1} {2}"

if ordering is not None:
sql_template = sql_template + " {2}"
sql_template = sql_template + " {3}"

if limit is not None:
sql_template = sql_template + " {3}"
sql_template = sql_template + " {4}"

select_sql = sql_template.format(
self.table_name, where_clause, ordering, limit
self.keys, self.table_name, where_clause, ordering, limit
).rstrip()

try:
Expand Down Expand Up @@ -186,6 +186,7 @@ async def create_if_missing(table_name, command):
class AsyncFlowTablePostgres(AsyncPostgresTable):
flow_dict = {}
table_name = "flows_v3"
keys = "flow_id, user_name, ts_epoch, tags, system_tags"
_command = """
CREATE TABLE {0} (
flow_id VARCHAR(255) PRIMARY KEY,
Expand Down Expand Up @@ -222,6 +223,7 @@ class AsyncRunTablePostgres(AsyncPostgresTable):
_current_count = 0
_row_type = RunRow
table_name = "runs_v3"
keys = "flow_id, run_number, user_name, ts_epoch, tags, system_tags"
flow_table_name = AsyncFlowTablePostgres.table_name
_command = """
CREATE TABLE {0} (
Expand Down Expand Up @@ -262,6 +264,7 @@ class AsyncStepTablePostgres(AsyncPostgresTable):
run_to_step_dict = {}
_row_type = StepRow
table_name = "steps_v3"
keys = "flow_id, run_number, step_name, user_name, ts_epoch, tags, system_tags"
run_table_name = AsyncRunTablePostgres.table_name
_command = """
CREATE TABLE {0} (
Expand Down Expand Up @@ -309,6 +312,8 @@ class AsyncTaskTablePostgres(AsyncPostgresTable):
_current_count = 0
_row_type = TaskRow
table_name = "tasks_v3"
keys = "flow_id, run_number, step_name, task_id, user_name, ts_epoch, " \
"tags, system_tags"
step_table_name = AsyncStepTablePostgres.table_name
_command = """
CREATE TABLE {0} (
Expand Down Expand Up @@ -361,6 +366,8 @@ class AsyncMetadataTablePostgres(AsyncPostgresTable):
_current_count = 0
_row_type = MetadataRow
table_name = "metadata_v3"
keys = "flow_id, run_number, step_name, task_id, id, field_name, value, " \
"type, user_name, ts_epoch, tags, system_tags"
task_table_name = AsyncTaskTablePostgres.table_name
_command = """
CREATE TABLE {0} (
Expand Down Expand Up @@ -410,7 +417,10 @@ async def add_metadata(
return await self.create_record(dict)

async def get_metadata_in_runs(self, flow_id: str, run_id: int):
filter_dict = {"flow_id": "'{0}'".format(flow_id), "run_number": str(run_id)}
filter_dict = {
"flow_id": "'{0}'".format(flow_id),
"run_number": str(run_id)
}
return await self.get_records(filter_dict=filter_dict)

async def get_metadata(
Expand All @@ -433,6 +443,9 @@ class AsyncArtifactTablePostgres(AsyncPostgresTable):
current_count = 0
_row_type = ArtifactRow
table_name = "artifact_v3"
keys = "flow_id, run_number, step_name, task_id, name, location, " \
"ds_type, sha, type, content_type, user_name, attempt_id, " \
"ts_epoch, tags, system_tags"
task_table_name = AsyncTaskTablePostgres.table_name
ordering = "ORDER BY attempt_id DESC"
_command = """
Expand Down

0 comments on commit 4459966

Please sign in to comment.