-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtable_maker.py
138 lines (108 loc) · 6.09 KB
/
table_maker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import pandas as pd
import pandas.api.types as t
from extent_table import ExtentTable
import logging
import sys
import numpy as np
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
class TableMaker:
ROOT_TABLE = "root"
def __init__(self, extent_table: ExtentTable):
self.__extent_table = extent_table
def convert_json_objects_to_tables(self, json_objects: list, name: str) -> None:
for json_object in json_objects:
self.convert_json_object_to_table(json_object, name)
def convert_json_object_to_table(self, json_object: dict, name: str) -> int:
successfully_populated = self.__populate_table(json_object, name)
if not successfully_populated:
return -1
current_id = self.__extent_table.get_current_id(name)
self.__extent_table.increment_current_id(name)
return current_id
def __populate_table(self, json_object: dict, name: str) -> bool:
successfully_populated = False
for key, value in json_object.items():
self.__add_values_to_table(name, key, value)
successfully_populated = True
return successfully_populated
def __add_values_to_table(self, table_name: str, attribute: str, value) -> None:
if self.__is_value_complex(value):
self.__add_complex_value_to_table(table_name, attribute, value)
elif self.__is_multivalued(value):
self.__add_iterable_to_table(table_name, attribute, value)
else:
self.__add_scalar_value_to_table(table_name, attribute, value)
def __add_complex_value_to_table(self, table_name: str, attribute: str, value: dict) -> None:
reference_table_name = attribute
reference_table_id = self.convert_json_object_to_table(value, reference_table_name)
if reference_table_id >= 0:
self.__extent_table.add_value(table_name, reference_table_name, reference_table_id)
def __add_scalar_value_to_table(self, table_name: str, attribute: str, value) -> None:
self.__extent_table.add_value(table_name, attribute, value)
def __add_iterable_to_table(self, table_name: str, attribute: str, values: list) -> None:
if len(values) == 0:
return
multivalued_table_name = table_name + "_?_" + attribute
self.__extent_table.create_table(table_name) # creates table if none existent
columns = [ExtentTable.ID_COLUMN, ExtentTable.PARENT_COLUMN, ExtentTable.IS_SCALAR, ExtentTable.SCALAR_VALUE]
self.__extent_table.create_table_from_columns(multivalued_table_name, columns)
parent_table_current_id = self.__extent_table.get_current_id(table_name)
rows = list()
for value in values:
row = dict.fromkeys(columns)
if self.__is_value_complex(value):
row[ExtentTable.ID_COLUMN] = self.__extent_table.get_current_id(multivalued_table_name)
row[ExtentTable.PARENT_COLUMN] = str(parent_table_current_id)
row[ExtentTable.IS_SCALAR] = False
row[ExtentTable.SCALAR_VALUE] = None
reference_table_name = self.__generate_table_name_from_complex_attribute(multivalued_table_name, value)
value[ExtentTable.PARENT_COLUMN] = row[ExtentTable.ID_COLUMN]
self.convert_json_object_to_table(value, reference_table_name)
else:
row[ExtentTable.ID_COLUMN] = self.__extent_table.get_current_id(multivalued_table_name)
row[ExtentTable.PARENT_COLUMN] = str(parent_table_current_id)
row[ExtentTable.IS_SCALAR] = True
row[ExtentTable.SCALAR_VALUE] = str(value)
rows.append(row)
self.__extent_table.increment_current_id_pointer(multivalued_table_name)
new_table = pd.DataFrame(rows)
self.__extent_table.concat_tables(multivalued_table_name, new_table)
def __is_multivalued(self, value) -> bool:
return isinstance(value, list)
def __is_value_complex(self, value) -> bool:
return isinstance(value, dict)
def show_tables(self, num_elements: int = 5) -> None:
tables = self.__extent_table.get_all_tables()
logging.info("\n")
logging.info("MOSTRANT TAULES xD\n")
for table_name, table in tables:
logging.info("\nTable: " + table_name + "\n" + str(table.head(num_elements)) + "\n___________________\n\n")
def save_tables(self, directory: str, export_as="csv", sql_connection=None, cast_none_to_nan: bool = False,
cast_object_to_bool: bool = False) -> None:
"""
:param sql_connection: the sql connection if you export as sql. Otherwise just ignore the parameter
:param directory: the directory path for csv and html. For sql, pass in the root name share by all tables
:param export_as: allowed values are: "csv", "sql", "html"
:param cast_none_to_nan: Cast all None value to NaN (real null values)
:param cast_object_to_bool: Cast all columns containing only boolean and NaN values to boolean columns
:return: nothing
"""
tables = self.__extent_table.get_all_tables()
for table_name, table in tables:
if cast_none_to_nan:
table.replace(to_replace=[None], value=np.nan, inplace=True)
if cast_object_to_bool:
columns = table.columns
for column in columns:
unique_values = set(table[column].astype(str).tolist())
if unique_values.issubset({"True", "False", "None", "nan"}):
table[column] = table[column].astype(bool, copy=False)
if export_as == "csv":
table.to_csv(directory + table_name + "." + export_as, index=False)
elif export_as == "sql":
table.to_sql(directory + table_name, con=sql_connection)
else:
table.to_html(directory + table_name + "." + export_as, index=False)
def __generate_table_name_from_complex_attribute(self, base_name: str, value: dict) -> str:
keys = sorted(value.keys())
return base_name + "_$_" + str(keys[0])