From 2ee9eb33e9dd95b4554f0eafa2a179e0fe6bad51 Mon Sep 17 00:00:00 2001 From: gmichalo Date: Fri, 27 Apr 2018 17:16:27 -0400 Subject: [PATCH 1/8] updating reader --- holoclean/utils/reader.py | 89 +-------------------- reader.py | 164 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 88 deletions(-) create mode 100644 reader.py diff --git a/holoclean/utils/reader.py b/holoclean/utils/reader.py index dd21f2b3..230fac3e 100644 --- a/holoclean/utils/reader.py +++ b/holoclean/utils/reader.py @@ -1,6 +1,5 @@ from holoclean.global_variables import GlobalVariables from pyspark.sql.functions import * -from pyspark.sql.types import StructField, StructType, StringType, LongType class Reader: @@ -13,7 +12,6 @@ class Reader: def __init__(self, spark_session): """ Constructing reader object - :param spark_session: The spark_session we created in Holoclean object """ self.spark_session = spark_session @@ -22,7 +20,6 @@ def __init__(self, spark_session): def _findextesion(self, filepath): """ Finds the extesion of the file. - :param filepath: The path to the file """ extention = filepath.split('.')[-1] @@ -31,12 +28,9 @@ def _findextesion(self, filepath): def read(self, filepath, indexcol=0, schema=None): """ Calls the appropriate reader for the file - :param schema: optional schema when known :param filepath: The path to the file - :return: data frame of the read data - """ if self._findextesion(filepath) == "csv": csv_obj = CSVReader() @@ -58,12 +52,10 @@ def __init__(self): def read(self, file_path, spark_session, indexcol=0, schema=None): """ Creates a dataframe from the csv file - :param indexcol: if 1, create a tuple id column as auto increment :param schema: optional schema of file if known :param spark_session: The spark_session we created in Holoclean object :param file_path: The path to the file - :return: dataframe """ if schema is None: @@ -75,90 +67,11 @@ def read(self, file_path, spark_session, indexcol=0, schema=None): return df index_name = GlobalVariables.index_name - new_cols = df.schema.names + [index_name] - list_schema = [] - for index_attribute in range(len(df.schema.names)): - list_schema.append(StructField("_" + str(index_attribute), - df.schema[ - index_attribute].dataType, - True)) - list_schema.append( - StructField("_" + str(len(new_cols)), LongType(), True)) - - schema = StructType(list_schema) ix_df = df.rdd.zipWithIndex().map( - lambda (row, ix): row + (ix + 1,)).toDF(schema) + lambda (row, ix): row + (ix + 1,)).toDF() tmp_cols = ix_df.schema.names new_df = reduce(lambda data, idx: data.withColumnRenamed(tmp_cols[idx], new_cols[idx]), xrange(len(tmp_cols)), ix_df) - new_df = self.checking_string_size(new_df) return new_df - - def checking_string_size(self, dataframe): - """ - This method checks if the dataframe has columns with strings with more - than 250 characters - - :param dataframe: the initial dataframe - :return: dataframe: a new dataframe without the columns with strings - with more than 250 characters - """ - - columns = set([]) - for row in dataframe.collect(): - for attribute in dataframe.columns: - if isinstance(row[attribute], unicode) and\ - len(row[attribute]) > 250: - columns.add(attribute) - if len(columns) > 0: - dataframe = self.ignore_columns(columns, dataframe) - return dataframe - - def ignore_columns(self, columns, dataframe): - """ - This method asks the user if he wants to drop a column which has a - string with more than 250 characters - - :param columns: a set of columns with strings with more - than 250 characters - :param dataframe: the dataframe that we want to change - - :return: dataframe: a new dataframe - """ - print("Holoclean cannot use dataframes with strings " - "more than 250 characters") - for column in columns: - answer = raw_input( - "The column " + column + " has a string of length " - "more than 250 characters.Do you want to drop this column" - " (y/n)?") - while answer != "y" and answer != "n": - answer = raw_input( - "the column " + column + " has a string of length" - "more than 250 characters." - "Do you want to drop this column" - "(y/n)?") - if answer == "y": - dataframe = self.drop_column(column, dataframe) - else: - print \ - "Holoclean cannot use dataframes with strings " \ - "more than 250 characters. please check your dataset" - exit(5) - return dataframe - - def drop_column(self, column, dataframe): - """ - This method drop a specific column from a dataframe - - :param column: a column that will be dropped - :param dataframe: the dataframe that will be change - - :return: dataframe - """ - return dataframe.drop(column) - - - diff --git a/reader.py b/reader.py new file mode 100644 index 00000000..dd21f2b3 --- /dev/null +++ b/reader.py @@ -0,0 +1,164 @@ +from holoclean.global_variables import GlobalVariables +from pyspark.sql.functions import * +from pyspark.sql.types import StructField, StructType, StringType, LongType + + +class Reader: + + """ + Reader class: + Finds the extension of the file and calls the appropriate reader + """ + + def __init__(self, spark_session): + """ + Constructing reader object + + :param spark_session: The spark_session we created in Holoclean object + """ + self.spark_session = spark_session + + # Internal Methods + def _findextesion(self, filepath): + """ + Finds the extesion of the file. + + :param filepath: The path to the file + """ + extention = filepath.split('.')[-1] + return extention + + def read(self, filepath, indexcol=0, schema=None): + """ + Calls the appropriate reader for the file + + :param schema: optional schema when known + :param filepath: The path to the file + + :return: data frame of the read data + + """ + if self._findextesion(filepath) == "csv": + csv_obj = CSVReader() + df = csv_obj.read(filepath, self.spark_session, indexcol, schema) + return df + else: + print("This extension doesn't support") + + +class CSVReader: + """ + CSVReader class: Reads a csv file and send its content back + """ + + def __init__(self): + pass + + # Setters + def read(self, file_path, spark_session, indexcol=0, schema=None): + """ + Creates a dataframe from the csv file + + :param indexcol: if 1, create a tuple id column as auto increment + :param schema: optional schema of file if known + :param spark_session: The spark_session we created in Holoclean object + :param file_path: The path to the file + + :return: dataframe + """ + if schema is None: + df = spark_session.read.csv(file_path, header=True) + else: + df = spark_session.read.csv(file_path, header=True, schema=schema) + + if indexcol == 0: + return df + + index_name = GlobalVariables.index_name + + new_cols = df.schema.names + [index_name] + list_schema = [] + for index_attribute in range(len(df.schema.names)): + list_schema.append(StructField("_" + str(index_attribute), + df.schema[ + index_attribute].dataType, + True)) + list_schema.append( + StructField("_" + str(len(new_cols)), LongType(), True)) + + schema = StructType(list_schema) + ix_df = df.rdd.zipWithIndex().map( + lambda (row, ix): row + (ix + 1,)).toDF(schema) + tmp_cols = ix_df.schema.names + new_df = reduce(lambda data, idx: data.withColumnRenamed(tmp_cols[idx], + new_cols[idx]), + xrange(len(tmp_cols)), ix_df) + new_df = self.checking_string_size(new_df) + return new_df + + def checking_string_size(self, dataframe): + """ + This method checks if the dataframe has columns with strings with more + than 250 characters + + :param dataframe: the initial dataframe + :return: dataframe: a new dataframe without the columns with strings + with more than 250 characters + """ + + columns = set([]) + for row in dataframe.collect(): + for attribute in dataframe.columns: + if isinstance(row[attribute], unicode) and\ + len(row[attribute]) > 250: + columns.add(attribute) + if len(columns) > 0: + dataframe = self.ignore_columns(columns, dataframe) + return dataframe + + def ignore_columns(self, columns, dataframe): + """ + This method asks the user if he wants to drop a column which has a + string with more than 250 characters + + :param columns: a set of columns with strings with more + than 250 characters + :param dataframe: the dataframe that we want to change + + :return: dataframe: a new dataframe + """ + print("Holoclean cannot use dataframes with strings " + "more than 250 characters") + for column in columns: + answer = raw_input( + "The column " + column + " has a string of length " + "more than 250 characters.Do you want to drop this column" + " (y/n)?") + while answer != "y" and answer != "n": + answer = raw_input( + "the column " + column + " has a string of length" + "more than 250 characters." + "Do you want to drop this column" + "(y/n)?") + if answer == "y": + dataframe = self.drop_column(column, dataframe) + else: + print \ + "Holoclean cannot use dataframes with strings " \ + "more than 250 characters. please check your dataset" + exit(5) + return dataframe + + def drop_column(self, column, dataframe): + """ + This method drop a specific column from a dataframe + + :param column: a column that will be dropped + :param dataframe: the dataframe that will be change + + :return: dataframe + """ + return dataframe.drop(column) + + + From 127ae1d3d42d91ff10e98c86ae6b66d5a7ad9931 Mon Sep 17 00:00:00 2001 From: gmichalo Date: Fri, 27 Apr 2018 17:23:21 -0400 Subject: [PATCH 2/8] update pruning methods --- holoclean/holoclean.py | 7 +- holoclean/utils/cooccurpruning.py | 541 +++++++++++++++++++++++++++ holoclean/utils/pruning.py | 592 ++---------------------------- 3 files changed, 575 insertions(+), 565 deletions(-) create mode 100644 holoclean/utils/cooccurpruning.py diff --git a/holoclean/holoclean.py b/holoclean/holoclean.py index da2403d8..dcafbcbd 100644 --- a/holoclean/holoclean.py +++ b/holoclean/holoclean.py @@ -11,7 +11,7 @@ from dataengine import DataEngine from dataset import Dataset from featurization.database_worker import DatabaseWorker, PopulateTensor -from utils.pruning import Pruning +from utils.cooccurpruning import CooccurPruning from utils.parser_interface import ParserInterface, DenialConstraint import multiprocessing @@ -650,10 +650,13 @@ def _ds_domain_pruning(self, pruning_threshold1, pruning_threshold2, 'starting domain pruning with threshold %s', pruning_threshold1) - self.pruning = Pruning( + self.pruning = CooccurPruning( self, pruning_threshold1, pruning_threshold2, pruning_dk_breakoff, pruning_clean_breakoff) + self.pruning.get_domain() + + self.pruning._create_dataframe() self.holo_env.logger.info('Domain pruning is finished :') return diff --git a/holoclean/utils/cooccurpruning.py b/holoclean/utils/cooccurpruning.py new file mode 100644 index 00000000..aa65b605 --- /dev/null +++ b/holoclean/utils/cooccurpruning.py @@ -0,0 +1,541 @@ +import random +from holoclean.global_variables import GlobalVariables +from pruning import Pruning, RandomVar +import time + +class CooccurPruning(Pruning): + """Pruning class: Creates the domain table for all the cells""" + + def __init__(self, session, + threshold1=0.1, + threshold2=0.3, + dk_breakoff=3, + clean_breakoff=10 + ): + """ + Initializing the pruning object + :param session: session object + :param threshold1: to limit possible values for training data + :param threshold2: to limit possible values for dirty data + :param dk_breakoff: to limit possible values for dirty data to + less than k values + :param clean_breakoff: to limit possible values for training data to + less than k values + """ + super(CooccurPruning, self).__init__(session) + self.threshold1 = threshold1 + self.threshold2 = threshold2 + self.dk_breakoff = dk_breakoff + self.clean_breakoff = clean_breakoff + self.dataset = session.dataset + self.assignments = {} + self.cell_domain_nb = {} + self.domain_stats = {} + self.domain_pair_stats = {} + self.column_to_col_index_dict = {} + self.attribute_to_be_pruned = {} + self.coocurence_lookup = {} + self.all_cells = [] + self.all_cells_temp = {} + self.index = 0 + + # Internal Method + def _compute_number_of_coocurences( + self, + original_attribute, + original_attr_value, + cooccured_attribute, + cooccured_attr_value): + """ + _compute_number_of_coocurences creates conditional probabilities for + each cell with the attribute and value of each other cell in the same + row + :param original_attribute: the name of first attribute + :param original_attr_value: the initial value of the first attribute + :param cooccured_attribute: the name of second attribute + :param cooccured_attr_value: the initial value of the second attribute + :return: probability + """ + if (original_attr_value, cooccured_attr_value) not in \ + self.domain_pair_stats[ + original_attribute][cooccured_attribute]: + return None + + cooccur_count = \ + self.domain_pair_stats[original_attribute][cooccured_attribute][( + original_attr_value, cooccured_attr_value)] + + value_count = self.domain_stats[original_attribute][ + original_attr_value] + + # Compute counter + if original_attr_value is None or cooccured_attr_value is None: + probability = 0 + else: + probability = cooccur_count / value_count + return probability + + # We need a new function like find_domain for clean cells + # such that it does not limit the domain to the possible values + # above the threshold + # first iteration, use a low threshold (i.e. 0) and limit using k + + def _find_dk_domain(self, assignment, trgt_attr): + """ + This method finds the domain for each dirty cell for inference + :param assignment: the values for every attribute + :param trgt_attr: the name of attribute + :return: dictionary of cells values + """ + # cell_probabilities will hold domain values and their probabilities + cell_probabilities = [] + # always have the initial value in the returned domain values unless + # it is null + if assignment[trgt_attr] is not None: + cell_values = {(assignment[trgt_attr])} + else: + cell_values = {()} + for attr in assignment: + if attr == trgt_attr: + continue + attr_val = assignment[attr] + + if attr in self.coocurence_lookup: + if attr_val in self.coocurence_lookup[attr]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + cell_probabilities += \ + [(k, v) for + k, v in + self.coocurence_lookup[attr][attr_val][ + trgt_attr].iteritems()] + + # Sort cell_values and chop after k and chop below threshold2 + cell_probabilities.sort(key=lambda t: t[1], reverse=True) + + for tuple in cell_probabilities: + value = tuple[0] + probability = tuple[1] + if len(cell_values) == self.dk_breakoff or \ + probability < self.threshold2: + break + cell_values.add(value) + return cell_values + + def _find_clean_domain(self, assignment, trgt_attr): + """ + This method finds the domain for each clean cell for learning + :param assignment: dictionary of values + :param trgt_attr: attribute of other column + :return: dictionary of cells values + """ + cell_probabilities = [] + + # Always have the initial value in the returned domain values unless + # it is Null + if assignment[trgt_attr] is not None: + cell_values = {(assignment[trgt_attr])} + else: + cell_values = {()} + for attr in assignment: + if attr == trgt_attr: + continue + attr_val = assignment[attr] + + if attr in self.coocurence_lookup: + if attr_val in self.coocurence_lookup[attr]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + cell_probabilities += \ + [(k, v) for + k, v + in + self.coocurence_lookup[attr][attr_val][ + trgt_attr].iteritems()] + + # get l values from the lookup exactly like in dirty where l < k + # get k-l random once from the domain + cell_probabilities.sort(key=lambda t: t[1]) + while len(cell_probabilities) > 0: # for now l = k/2 + if len(cell_values) == self.clean_breakoff/2: + break + tuple = cell_probabilities.pop() + value = tuple[0] + cell_values.add(value) + + random.shuffle(cell_probabilities) + + while len(cell_probabilities) > 0: + if len(cell_values) == self.clean_breakoff: + break + tuple = cell_probabilities.pop() + value = tuple[0] + cell_values.add(value) + return cell_values + + def _preprop(self): + """ + Preprocessing phase: create the dictionary with all the attributes. + :return: Null + """ + + # This part creates dictionary to find column index + + row_dictionary_element = self.cellvalues[1] + for cid in row_dictionary_element: + cell = row_dictionary_element[cid] + self.column_to_col_index_dict[cell.columnname] = cid + + # This part makes empty dictionary for each attribute + for col in self.column_to_col_index_dict: + self.domain_stats[col] = {} + + # This part adds other (dirty) attributes + # to the dictionary of the key attribute + for column_name_key in self.column_to_col_index_dict: + self.domain_pair_stats[column_name_key] = {} + for col2 in self.dirty_cells_attributes: + if col2 != column_name_key: + self.domain_pair_stats[column_name_key][col2] = {} + return + + def _analyze_entries(self): + """ + Analyze entries creates a dictionary with occurrences of the attributes + :return: Null + """ + # Iterate over tuples to create to dictionary + for tupleid in self.cellvalues: + # Iterate over attributes and grab counts for create dictionary + # that how for each attribute how many times we see each value + for cid in self.cellvalues[tupleid]: + cell = self.cellvalues[tupleid][cid] + col = cell.columnname + val = cell.value + if col in self.dirty_cells_attributes: + self.all_cells.append(cell) + self.cellvalues[tupleid][cid].domain = 1 + self.all_cells_temp[cell.cellid] = cell + + if val not in self.domain_stats[col]: + self.domain_stats[col][val] = 0.0 + self.domain_stats[col][val] += 1.0 + + # Iterate over target attributes and grab + # counts of values with other attributes + for col in self.domain_pair_stats: + cid = self.column_to_col_index_dict[col] + for tgt_col in self.domain_pair_stats[col]: + tgt_cid = self.column_to_col_index_dict[tgt_col] + tgt_val = self.cellvalues[tupleid][tgt_cid].value + val = self.cellvalues[tupleid][cid].value + assgn_tuple = (val, tgt_val) + if assgn_tuple not in self.domain_pair_stats[col][tgt_col]: + self.domain_pair_stats[col][tgt_col][assgn_tuple] = 0.0 + self.domain_pair_stats[col][tgt_col][assgn_tuple] += 1.0 + return + + def _generate_coocurences(self): + """ + This method creates all value of co-occurences + :return: Null + """ + for original_attribute in self.domain_pair_stats: + # For each column in the cooccurences + self.coocurence_lookup[original_attribute] = {} + # It creates a dictionary + for cooccured_attribute in \ + self.domain_pair_stats[original_attribute]: + # For second column in the cooccurences Over + # Pair of values that appeared together + # (original_attribute value , cooccured_attribute value) + for assgn_tuple in self.domain_pair_stats[ + original_attribute][ + cooccured_attribute]: + co_prob = self._compute_number_of_coocurences( + original_attribute, assgn_tuple[0], + cooccured_attribute, + assgn_tuple[1]) + + if co_prob > self.threshold1: + if assgn_tuple[0] not in \ + self.coocurence_lookup[ + original_attribute]: + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]] = {} + + if cooccured_attribute not in \ + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]]: + self.coocurence_lookup[ + original_attribute][ + assgn_tuple[0]][cooccured_attribute] = {} + + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]][ + cooccured_attribute][ + assgn_tuple[1]] = co_prob + return + + def _generate_assignments(self): + """ + Generate_assignments creates assignment for each cell with the + attribute and value of each other cell in the same row + :return: Null + """ + for cell in self.all_cells: + tplid = cell.tupleid + trgt_attr = cell.columnname + + # assignment is a dictionary for each cell copied + # the row of that cell + # with all attribute to find the cooccurance + assignment = {} + for cid in self.cellvalues[tplid]: + c = self.cellvalues[tplid][cid] + assignment[c.columnname] = c.value + self.assignments[cell] = assignment + self.attribute_to_be_pruned[cell.cellid] = trgt_attr + return + + def _find_cell_domain(self): + """ + Find_cell_domain finds the domain for each cell + :return: Null + """ + for cell in self.assignments: + # In this part we get all values for cell_index's + # attribute_to_be_pruned + + # if the cell is dirty call find domain + # else, call get negative examples (domain for clean cells) + if cell.dirty == 1: + self.cell_domain[cell.cellid] = self._find_dk_domain( + self.assignments[cell], + self.attribute_to_be_pruned[cell.cellid]) + else: + self.cell_domain[cell.cellid] = self._find_clean_domain( + self.assignments[cell], + self.attribute_to_be_pruned[cell.cellid]) + return + + def _append_possible(self, v_id, value, dataframe, cell_index, k_ij): + """ + Appends possible values to a list + :param v_id: variable id + :param value: value + :param dataframe: list + :param cell_index: index of cell + :param k_ij: domain id + """ + if value != self.all_cells_temp[cell_index].value: + dataframe.append( + [v_id, (self.all_cells_temp[cell_index].tupleid + 1), + self.all_cells_temp[cell_index].columnname, + unicode(value), 0, k_ij]) + else: + dataframe.append( + [v_id, (self.all_cells_temp[cell_index].tupleid + 1), + self.all_cells_temp[cell_index].columnname, + unicode(value), 1, k_ij]) + + def _create_dataframe(self): + """ + Creates spark dataframes from cell_domain for all the cells + :return: Null + """ + + attributes = self.dataset.get_schema('Init') + domain_dict = {} + domain_kij_clean = [] + domain_kij_dk = [] + for attribute in attributes: + if attribute != GlobalVariables.index_name: + domain_dict[attribute] = set() + + possible_values_clean = [] + possible_values_dirty = [] + self.v_id_clean_list = [] + self.v_id_dk_list = [] + v_id_clean = v_id_dk = 0 + + for tuple_id in self.cellvalues: + for cell_index in self.cellvalues[tuple_id]: + attribute = self.cellvalues[tuple_id][cell_index].columnname + value = self.cellvalues[tuple_id][cell_index].value + domain_dict[attribute].add(value) + + if self.cellvalues[tuple_id][cell_index].dirty == 1: + tmp_cell_index = self.cellvalues[tuple_id][ + cell_index].cellid + if self.cellvalues[tuple_id][cell_index].domain == 1: + k_ij = 0 + v_id_dk = v_id_dk + 1 + + self.v_id_dk_list.append([( + self.cellvalues[tuple_id][cell_index].tupleid +1), + self.cellvalues[tuple_id][cell_index].columnname, + tmp_cell_index]) + + for value in self.cell_domain[tmp_cell_index]: + if value != (): + k_ij = k_ij + 1 + + self._append_possible( + v_id_dk, + value, + possible_values_dirty, + tmp_cell_index, k_ij + ) + + domain_kij_dk.append([v_id_dk, ( + self.all_cells_temp[tmp_cell_index].tupleid + 1), + self.all_cells_temp[tmp_cell_index].columnname, + k_ij]) + else: + + tmp_cell_index = self.cellvalues[tuple_id][ + cell_index].cellid + + if self.cellvalues[tuple_id][cell_index].domain == 1: + if len(self.cell_domain[tmp_cell_index]) > 1: + k_ij = 0 + v_id_clean = v_id_clean + 1 + + self.v_id_clean_list.append([ + (self.all_cells_temp[ + tmp_cell_index + ].tupleid + 1), + self.cellvalues[tuple_id][cell_index].columnname, tmp_cell_index]) + + for value in self.cell_domain[tmp_cell_index]: + if value != 0: + k_ij = k_ij + 1 + + self._append_possible( + v_id_clean, + value, + possible_values_clean, + tmp_cell_index, k_ij) + + domain_kij_clean.append([ + v_id_clean, + (self.all_cells_temp[ + tmp_cell_index + ].tupleid + 1), + self.cellvalues[tuple_id][cell_index].columnname, k_ij]) + + self.all_cells = None + self.all_cells_temp = None + + # Create possible table + df_possible_clean = self.spark_session.createDataFrame( + possible_values_clean, self.dataset.attributes['Possible_values'] + ) + + self.dataengine.add_db_table('Possible_values_clean', + df_possible_clean, self.dataset) + self.dataengine.add_db_table_index( + self.dataset.table_specific_name('Possible_values_clean'), + 'attr_name') + + df_possible_dk = self.spark_session.createDataFrame( + possible_values_dirty, self.dataset.attributes['Possible_values']) + + self.dataengine.add_db_table('Possible_values_dk', + df_possible_dk, self.dataset) + self.dataengine.add_db_table_index( + self.dataset.table_specific_name('Possible_values_dk'), + 'attr_name') + + df_kij = self.spark_session.createDataFrame( + domain_kij_dk, self.dataset.attributes['Kij_lookup']) + self.dataengine.add_db_table('Kij_lookup_dk', + df_kij, self.dataset) + + df_kij = self.spark_session.createDataFrame( + domain_kij_clean, self.dataset.attributes['Kij_lookup']) + self.dataengine.add_db_table('Kij_lookup_clean', + df_kij, self.dataset) + + self.dataengine.holo_env.logger.info('The table: ' + + self.dataset.table_specific_name( + 'Kij_lookup_clean') + + " has been created") + self.dataengine.holo_env.logger.info(" ") + self.dataengine.holo_env.logger.info('The table: ' + + self.dataset.table_specific_name( + 'Possible_values_dk') + + " has been created") + self.dataengine.holo_env.logger.info(" ") + + create_feature_id_map = "Create TABLE " + \ + self.dataset.table_specific_name( + "Feature_id_map") + \ + "( feature_ind INT," \ + " attribute VARCHAR(255)," \ + " value VARCHAR(255)," \ + " type VARCHAR(255) );" + + self.dataengine.query(create_feature_id_map) + + query_observed = "CREATE TABLE " + \ + self.dataset.table_specific_name( + 'Observed_Possible_values_clean') + \ + " AS SELECT * FROM " + \ + self.dataset.table_specific_name( + 'Possible_values_clean') + " as t1 " + \ + " WHERE " \ + " t1.observed=1;" + + self.dataengine.query(query_observed) + + query_observed = \ + "CREATE TABLE " + \ + self.dataset.table_specific_name('Observed_Possible_values_dk') + \ + " AS SELECT * FROM " + \ + self.dataset.table_specific_name('Possible_values_dk') + \ + " as t1 WHERE " \ + "t1.observed=1;" + + self.dataengine.query(query_observed) + self.assignments = None + self.attribute_to_be_pruned = None + self.attribute_map = None + + return + + def get_domain(self): + """ + :return: + :rtype: + """ + session = self.session + + t1 = time.time() + self._preprop() + t2 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_preprop " + str(t2 - t1)) + + self._analyze_entries() + t3 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_analyze_entries " + str(t3 - t2)) + + self._generate_assignments() + t4 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info( + "_generate_assignments " + str(t4 - t3)) + + self._generate_coocurences() + t5 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info( + "_generate_coocurences " + str(t5 - t4)) + + self._find_cell_domain() + t6 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_find_cell_domain " + str(t6 - t5)) diff --git a/holoclean/utils/pruning.py b/holoclean/utils/pruning.py index e87bef28..cf65f3ea 100644 --- a/holoclean/utils/pruning.py +++ b/holoclean/utils/pruning.py @@ -1,6 +1,5 @@ -import random +from abc import ABCMeta, abstractmethod from holoclean.global_variables import GlobalVariables -import time class RandomVar: @@ -9,109 +8,34 @@ class RandomVar: def __init__(self, **kwargs): """ Initializing random variable objects - :param kwargs: dictionary of properties """ self.__dict__.update(kwargs) class Pruning: - """Pruning class: Creates the domain table for all the cells""" + """ + This class is an abstract class for general pruning, it requires for + every sub-class to implement the create_dataframe method + """ + __metaclass__ = ABCMeta - def __init__(self, session, - threshold1=0.1, - threshold2=0.3, - dk_breakoff=3, - clean_breakoff=10 - ): + def __init__(self, session): """ - Initializing the pruning object - + Initializing Featurizer object abstraction :param session: session object - :param threshold1: to limit possible values for training data - :param threshold2: to limit possible values for dirty data - :param dk_breakoff: to limit possible values for dirty data to - less than k values - :param clean_breakoff: to limit possible values for training data to - less than k values """ self.session = session self.spark_session = session.holo_env.spark_session self.dataengine = session.holo_env.dataengine - self.threshold1 = threshold1 - self.threshold2 = threshold2 - self.dk_breakoff = dk_breakoff - self.clean_breakoff = clean_breakoff - self.dataset = session.dataset - self.assignments = {} - self.cell_domain_nb = {} - self.domain_stats = {} - self.domain_pair_stats = {} - self.column_to_col_index_dict = {} - self.attribute_to_be_pruned = {} + self.cellvalues = self._c_values() self.dirty_cells_attributes = set([]) - self.coocurence_lookup = {} + self._d_cell() self.cell_domain = {} - self.all_cells = [] - self.all_cells_temp = {} - self.index = 0 - - self.cellvalues = self._c_values() - self.noisycells = self._d_cell() - t1 = time.time() - self._preprop() - t2 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_preprop " + str(t2 - t1)) - - self._analyze_entries() - t3 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_analyze_entries " + str(t3 - t2)) - - self._generate_assignments() - t4 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info( - "_generate_assignments " + str(t4 - t3)) - - self._generate_coocurences() - t5 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info( - "_generate_coocurences " + str(t5 - t4)) - - self._find_cell_domain() - t6 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_find_cell_domain " + str(t6 - t5)) - - self._create_dataframe() - t7 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_create_dataframe " + str(t7 - t6)) - - # Internal Method - def _d_cell(self): - """ - Creates don't know cells list from the C_dk table - - :return: list of don't know cells - """ - dataframe_dont_know = self.session.dk_df - noisy_cells = [] - for cell in dataframe_dont_know.collect(): - cell_variable = RandomVar(columnname=cell[1], row_id=int(cell[0])) - noisy_cells.append(cell_variable) - self.cellvalues[int(cell[0]) - 1][ - self.attribute_map[cell[1]]].dirty = 1 - - return noisy_cells def _c_values(self): """ Create cell value dictionary from the init table - :return: dictionary of cells values """ dataframe_init = self.session.init_dataset @@ -140,493 +64,35 @@ def _c_values(self): row_id = row_id + 1 return cell_values - def _compute_number_of_coocurences( - self, - original_attribute, - original_attr_value, - cooccured_attribute, - cooccured_attr_value): - """ - _compute_number_of_coocurences creates conditional probabilities for - each cell with the attribute and value of each other cell in the same - row - - :param original_attribute: the name of first attribute - :param original_attr_value: the initial value of the first attribute - :param cooccured_attribute: the name of second attribute - :param cooccured_attr_value: the initial value of the second attribute - - :return: probability - """ - if (original_attr_value, cooccured_attr_value) not in \ - self.domain_pair_stats[ - original_attribute][cooccured_attribute]: - return None - - cooccur_count = \ - self.domain_pair_stats[original_attribute][cooccured_attribute][( - original_attr_value, cooccured_attr_value)] - - value_count = self.domain_stats[original_attribute][ - original_attr_value] - - # Compute counter - if original_attr_value is None or cooccured_attr_value is None: - probability = 0 - else: - probability = cooccur_count / value_count - return probability - - # We need a new function like find_domain for clean cells - # such that it does not limit the domain to the possible values - # above the threshold - # first iteration, use a low threshold (i.e. 0) and limit using k - - def _find_dk_domain(self, assignment, trgt_attr): - """ - This method finds the domain for each dirty cell for inference - - - :param assignment: the values for every attribute - :param trgt_attr: the name of attribute - - :return: dictionary of cells values - """ - # cell_probabilities will hold domain values and their probabilities - cell_probabilities = [] - # always have the initial value in the returned domain values unless - # it is null - if assignment[trgt_attr] is not None: - cell_values = {(assignment[trgt_attr])} - else: - cell_values = {()} - for attr in assignment: - if attr == trgt_attr: - continue - attr_val = assignment[attr] - - if attr in self.coocurence_lookup: - if attr_val in self.coocurence_lookup[attr]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - cell_probabilities += \ - [(k, v) for - k, v in - self.coocurence_lookup[attr][attr_val][ - trgt_attr].iteritems()] - - # Sort cell_values and chop after k and chop below threshold2 - cell_probabilities.sort(key=lambda t: t[1], reverse=True) - - for tuple in cell_probabilities: - value = tuple[0] - probability = tuple[1] - if len(cell_values) == self.dk_breakoff or \ - probability < self.threshold2: - break - cell_values.add(value) - return cell_values - - def _find_clean_domain(self, assignment, trgt_attr): - """ - This method finds the domain for each clean cell for learning - - :param assignment: dictionary of values - :param trgt_attr: attribute of other column - - :return: dictionary of cells values - """ - cell_probabilities = [] - - # Always have the initial value in the returned domain values unless - # it is Null - if assignment[trgt_attr] is not None: - cell_values = {(assignment[trgt_attr])} - else: - cell_values = {()} - for attr in assignment: - if attr == trgt_attr: - continue - attr_val = assignment[attr] - - if attr in self.coocurence_lookup: - if attr_val in self.coocurence_lookup[attr]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - cell_probabilities += \ - [(k, v) for - k, v - in - self.coocurence_lookup[attr][attr_val][ - trgt_attr].iteritems()] - - # get l values from the lookup exactly like in dirty where l < k - # get k-l random once from the domain - cell_probabilities.sort(key=lambda t: t[1]) - while len(cell_probabilities) > 0: # for now l = k/2 - if len(cell_values) == self.clean_breakoff/2: - break - tuple = cell_probabilities.pop() - value = tuple[0] - cell_values.add(value) - - random.shuffle(cell_probabilities) - - while len(cell_probabilities) > 0: - if len(cell_values) == self.clean_breakoff: - break - tuple = cell_probabilities.pop() - value = tuple[0] - cell_values.add(value) - return cell_values - - def _preprop(self): - """ - Preprocessing phase: create the dictionary with all the attributes. - - :return: Null - """ - - # This part creates dictionary to find column index - - row_dictionary_element = self.cellvalues[1] - for cid in row_dictionary_element: - cell = row_dictionary_element[cid] - self.column_to_col_index_dict[cell.columnname] = cid - - # This part gets the attributes of noisy cells - for cell in self.noisycells: - self.dirty_cells_attributes.add(cell.columnname) - - # This part makes empty dictionary for each attribute - for col in self.column_to_col_index_dict: - self.domain_stats[col] = {} - - # This part adds other (dirty) attributes - # to the dictionary of the key attribute - for column_name_key in self.column_to_col_index_dict: - self.domain_pair_stats[column_name_key] = {} - for col2 in self.dirty_cells_attributes: - if col2 != column_name_key: - self.domain_pair_stats[column_name_key][col2] = {} - return - - def _analyze_entries(self): - """ - Analyze entries creates a dictionary with occurrences of the attributes - - :return: Null - """ - # Iterate over tuples to create to dictionary - for tupleid in self.cellvalues: - # Iterate over attributes and grab counts for create dictionary - # that how for each attribute how many times we see each value - for cid in self.cellvalues[tupleid]: - cell = self.cellvalues[tupleid][cid] - col = cell.columnname - val = cell.value - if col in self.dirty_cells_attributes: - self.all_cells.append(cell) - self.cellvalues[tupleid][cid].domain = 1 - self.all_cells_temp[cell.cellid] = cell - - if val not in self.domain_stats[col]: - self.domain_stats[col][val] = 0.0 - self.domain_stats[col][val] += 1.0 - - # Iterate over target attributes and grab - # counts of values with other attributes - for col in self.domain_pair_stats: - cid = self.column_to_col_index_dict[col] - for tgt_col in self.domain_pair_stats[col]: - tgt_cid = self.column_to_col_index_dict[tgt_col] - tgt_val = self.cellvalues[tupleid][tgt_cid].value - val = self.cellvalues[tupleid][cid].value - assgn_tuple = (val, tgt_val) - if assgn_tuple not in self.domain_pair_stats[col][tgt_col]: - self.domain_pair_stats[col][tgt_col][assgn_tuple] = 0.0 - self.domain_pair_stats[col][tgt_col][assgn_tuple] += 1.0 - return - - def _generate_coocurences(self): - """ - This method creates all value of co-occurences - - :return: Null - """ - for original_attribute in self.domain_pair_stats: - # For each column in the cooccurences - self.coocurence_lookup[original_attribute] = {} - # It creates a dictionary - for cooccured_attribute in \ - self.domain_pair_stats[original_attribute]: - # For second column in the cooccurences Over - # Pair of values that appeared together - # (original_attribute value , cooccured_attribute value) - for assgn_tuple in self.domain_pair_stats[ - original_attribute][ - cooccured_attribute]: - co_prob = self._compute_number_of_coocurences( - original_attribute, assgn_tuple[0], - cooccured_attribute, - assgn_tuple[1]) - - if co_prob > self.threshold1: - if assgn_tuple[0] not in \ - self.coocurence_lookup[ - original_attribute]: - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]] = {} - - if cooccured_attribute not in \ - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]]: - self.coocurence_lookup[ - original_attribute][ - assgn_tuple[0]][cooccured_attribute] = {} - - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]][ - cooccured_attribute][ - assgn_tuple[1]] = co_prob - return - - def _generate_assignments(self): + def _d_cell(self): """ - Generate_assignments creates assignment for each cell with the - attribute and value of each other cell in the same row - - :return: Null + Finds the attributes that appear in don't know cells and flags the cells + in cellvalues that are dirty + :return: None """ - for cell in self.all_cells: - tplid = cell.tupleid - trgt_attr = cell.columnname - - # assignment is a dictionary for each cell copied - # the row of that cell - # with all attribute to find the cooccurance - assignment = {} - for cid in self.cellvalues[tplid]: - c = self.cellvalues[tplid][cid] - assignment[c.columnname] = c.value - self.assignments[cell] = assignment - self.attribute_to_be_pruned[cell.cellid] = trgt_attr - return + dataframe_dont_know = self.session.dk_df + for cell in dataframe_dont_know.collect(): - def _find_cell_domain(self): - """ - Find_cell_domain finds the domain for each cell + # This part gets the attributes of noisy cells + self.dirty_cells_attributes.add(cell[1]) - :return: Null - """ - for cell in self.assignments: - # In this part we get all values for cell_index's - # attribute_to_be_pruned + self.cellvalues[int(cell[0]) - 1][ + self.attribute_map[cell[1]]].dirty = 1 - # if the cell is dirty call find domain - # else, call get negative examples (domain for clean cells) - if cell.dirty == 1: - self.cell_domain[cell.cellid] = self._find_dk_domain( - self.assignments[cell], - self.attribute_to_be_pruned[cell.cellid]) - else: - self.cell_domain[cell.cellid] = self._find_clean_domain( - self.assignments[cell], - self.attribute_to_be_pruned[cell.cellid]) return - def _append_possible(self, v_id, value, dataframe, cell_index, k_ij): + @abstractmethod + def get_domain(self): """ - Appends possible values to a list - - :param v_id: variable id - :param value: value - :param dataframe: list - :param cell_index: index of cell - :param k_ij: domain id + This method is used to populate the cell_domain to get the possible + values for each cell """ - if value != self.all_cells_temp[cell_index].value: - dataframe.append( - [v_id, (self.all_cells_temp[cell_index].tupleid + 1), - self.all_cells_temp[cell_index].columnname, - unicode(value), 0, k_ij]) - else: - dataframe.append( - [v_id, (self.all_cells_temp[cell_index].tupleid + 1), - self.all_cells_temp[cell_index].columnname, - unicode(value), 1, k_ij]) + pass + @abstractmethod def _create_dataframe(self): """ - Creates spark dataframes from cell_domain for all the cells - - :return: Null + Creates spark dataframes from cell_domain for all the cells, then creates + the possible values tables and kij_lookup for the domain in sql """ - - attributes = self.dataset.get_schema('Init') - domain_dict = {} - domain_kij_clean = [] - domain_kij_dk = [] - for attribute in attributes: - if attribute != GlobalVariables.index_name: - domain_dict[attribute] = set() - - possible_values_clean = [] - possible_values_dirty = [] - self.v_id_clean_list = [] - self.v_id_dk_list = [] - v_id_clean = v_id_dk = 0 - - for tuple_id in self.cellvalues: - for cell_index in self.cellvalues[tuple_id]: - attribute = self.cellvalues[tuple_id][cell_index].columnname - value = self.cellvalues[tuple_id][cell_index].value - domain_dict[attribute].add(value) - - if self.cellvalues[tuple_id][cell_index].dirty == 1: - tmp_cell_index = self.cellvalues[tuple_id][ - cell_index].cellid - if self.cellvalues[tuple_id][cell_index].domain == 1: - k_ij = 0 - v_id_dk = v_id_dk + 1 - - self.v_id_dk_list.append([( - self.all_cells_temp[ - tmp_cell_index - ].tupleid + 1), - self.all_cells_temp[ - tmp_cell_index - ].columnname, - tmp_cell_index]) - - for value in self.cell_domain[tmp_cell_index]: - if value != (): - k_ij = k_ij + 1 - - self._append_possible( - v_id_dk, - value, - possible_values_dirty, - tmp_cell_index, k_ij - ) - - domain_kij_dk.append([v_id_dk, ( - self.all_cells_temp[tmp_cell_index].tupleid + 1), - self.all_cells_temp[tmp_cell_index].columnname, - k_ij]) - else: - - tmp_cell_index = self.cellvalues[tuple_id][ - cell_index].cellid - - if self.cellvalues[tuple_id][cell_index].domain == 1: - if len(self.cell_domain[tmp_cell_index]) > 1: - k_ij = 0 - v_id_clean = v_id_clean + 1 - - self.v_id_clean_list.append([ - (self.all_cells_temp[ - tmp_cell_index - ].tupleid + 1), - self.all_cells_temp[ - tmp_cell_index - ].columnname, tmp_cell_index]) - - for value in self.cell_domain[tmp_cell_index]: - if value != 0: - k_ij = k_ij + 1 - - self._append_possible( - v_id_clean, - value, - possible_values_clean, - tmp_cell_index, k_ij) - - domain_kij_clean.append([ - v_id_clean, - (self.all_cells_temp[ - tmp_cell_index - ].tupleid + 1), - self.all_cells_temp[ - tmp_cell_index - ].columnname, k_ij]) - - self.all_cells = None - self.all_cells_temp = None - - # Create possible table - df_possible_clean = self.spark_session.createDataFrame( - possible_values_clean, self.dataset.attributes['Possible_values'] - ) - - self.dataengine.add_db_table('Possible_values_clean', - df_possible_clean, self.dataset) - self.dataengine.add_db_table_index( - self.dataset.table_specific_name('Possible_values_clean'), - 'attr_name') - - df_possible_dk = self.spark_session.createDataFrame( - possible_values_dirty, self.dataset.attributes['Possible_values']) - - self.dataengine.add_db_table('Possible_values_dk', - df_possible_dk, self.dataset) - self.dataengine.add_db_table_index( - self.dataset.table_specific_name('Possible_values_dk'), - 'attr_name') - - df_kij = self.spark_session.createDataFrame( - domain_kij_dk, self.dataset.attributes['Kij_lookup']) - self.dataengine.add_db_table('Kij_lookup_dk', - df_kij, self.dataset) - - df_kij = self.spark_session.createDataFrame( - domain_kij_clean, self.dataset.attributes['Kij_lookup']) - self.dataengine.add_db_table('Kij_lookup_clean', - df_kij, self.dataset) - - self.dataengine.holo_env.logger.info('The table: ' + - self.dataset.table_specific_name( - 'Kij_lookup_clean') + - " has been created") - self.dataengine.holo_env.logger.info(" ") - self.dataengine.holo_env.logger.info('The table: ' + - self.dataset.table_specific_name( - 'Possible_values_dk') + - " has been created") - self.dataengine.holo_env.logger.info(" ") - - create_feature_id_map = "Create TABLE " + \ - self.dataset.table_specific_name( - "Feature_id_map") + \ - "( feature_ind INT," \ - " attribute VARCHAR(255)," \ - " value VARCHAR(255)," \ - " type VARCHAR(255) );" - - self.dataengine.query(create_feature_id_map) - - query_observed = "CREATE TABLE " + \ - self.dataset.table_specific_name( - 'Observed_Possible_values_clean') + \ - " AS SELECT * FROM " + \ - self.dataset.table_specific_name( - 'Possible_values_clean') + " as t1 " + \ - " WHERE " \ - " t1.observed=1;" - - self.dataengine.query(query_observed) - - query_observed = \ - "CREATE TABLE " + \ - self.dataset.table_specific_name('Observed_Possible_values_dk') + \ - " AS SELECT * FROM " + \ - self.dataset.table_specific_name('Possible_values_dk') + \ - " as t1 WHERE " \ - "t1.observed=1;" - - self.dataengine.query(query_observed) - self.assignments = None - self.attribute_to_be_pruned = None - self.attribute_map = None - - return +pass \ No newline at end of file From b619e766192666863133c2ee2a8e774083ee9090 Mon Sep 17 00:00:00 2001 From: gmichalo Date: Sun, 29 Apr 2018 17:56:41 -0400 Subject: [PATCH 3/8] creating cooccurpruning --- holoclean/holoclean.py | 8 +- holoclean/utils/cooccurpruning.py | 541 +++++++++++++++++++++++++++ holoclean/utils/pruning.py | 592 ++---------------------------- 3 files changed, 576 insertions(+), 565 deletions(-) create mode 100644 holoclean/utils/cooccurpruning.py diff --git a/holoclean/holoclean.py b/holoclean/holoclean.py index da2403d8..5635fdee 100644 --- a/holoclean/holoclean.py +++ b/holoclean/holoclean.py @@ -11,7 +11,7 @@ from dataengine import DataEngine from dataset import Dataset from featurization.database_worker import DatabaseWorker, PopulateTensor -from utils.pruning import Pruning +from utils.cooccurpruning import CooccurPruning from utils.parser_interface import ParserInterface, DenialConstraint import multiprocessing @@ -650,10 +650,14 @@ def _ds_domain_pruning(self, pruning_threshold1, pruning_threshold2, 'starting domain pruning with threshold %s', pruning_threshold1) - self.pruning = Pruning( + self.pruning = CooccurPruning( self, pruning_threshold1, pruning_threshold2, pruning_dk_breakoff, pruning_clean_breakoff) + + self.pruning.get_domain() + + self.pruning._create_dataframe() self.holo_env.logger.info('Domain pruning is finished :') return diff --git a/holoclean/utils/cooccurpruning.py b/holoclean/utils/cooccurpruning.py new file mode 100644 index 00000000..859e06eb --- /dev/null +++ b/holoclean/utils/cooccurpruning.py @@ -0,0 +1,541 @@ +import random +from holoclean.global_variables import GlobalVariables +from pruning import Pruning, RandomVar +import time + +class CooccurPruning(Pruning): + """Pruning class: Creates the domain table for all the cells""" + + def __init__(self, session, + threshold1=0.1, + threshold2=0.3, + dk_breakoff=3, + clean_breakoff=10 + ): + """ + Initializing the pruning object + :param session: session object + :param threshold1: to limit possible values for training data + :param threshold2: to limit possible values for dirty data + :param dk_breakoff: to limit possible values for dirty data to + less than k values + :param clean_breakoff: to limit possible values for training data to + less than k values + """ + super(CooccurPruning, self).__init__(session) + self.threshold1 = threshold1 + self.threshold2 = threshold2 + self.dk_breakoff = dk_breakoff + self.clean_breakoff = clean_breakoff + self.dataset = session.dataset + self.assignments = {} + self.cell_domain_nb = {} + self.domain_stats = {} + self.domain_pair_stats = {} + self.column_to_col_index_dict = {} + self.attribute_to_be_pruned = {} + self.coocurence_lookup = {} + self.all_cells = [] + self.all_cells_temp = {} + self.index = 0 + + # Internal Method + def _compute_number_of_coocurences( + self, + original_attribute, + original_attr_value, + cooccured_attribute, + cooccured_attr_value): + """ + _compute_number_of_coocurences creates conditional probabilities for + each cell with the attribute and value of each other cell in the same + row + :param original_attribute: the name of first attribute + :param original_attr_value: the initial value of the first attribute + :param cooccured_attribute: the name of second attribute + :param cooccured_attr_value: the initial value of the second attribute + :return: probability + """ + if (original_attr_value, cooccured_attr_value) not in \ + self.domain_pair_stats[ + original_attribute][cooccured_attribute]: + return None + + cooccur_count = \ + self.domain_pair_stats[original_attribute][cooccured_attribute][( + original_attr_value, cooccured_attr_value)] + + value_count = self.domain_stats[original_attribute][ + original_attr_value] + + # Compute counter + if original_attr_value is None or cooccured_attr_value is None: + probability = 0 + else: + probability = cooccur_count / value_count + return probability + + # We need a new function like find_domain for clean cells + # such that it does not limit the domain to the possible values + # above the threshold + # first iteration, use a low threshold (i.e. 0) and limit using k + + def _find_dk_domain(self, assignment, trgt_attr): + """ + This method finds the domain for each dirty cell for inference + :param assignment: the values for every attribute + :param trgt_attr: the name of attribute + :return: dictionary of cells values + """ + # cell_probabilities will hold domain values and their probabilities + cell_probabilities = [] + # always have the initial value in the returned domain values unless + # it is null + if assignment[trgt_attr] is not None: + cell_values = {(assignment[trgt_attr])} + else: + cell_values = {()} + for attr in assignment: + if attr == trgt_attr: + continue + attr_val = assignment[attr] + + if attr in self.coocurence_lookup: + if attr_val in self.coocurence_lookup[attr]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + cell_probabilities += \ + [(k, v) for + k, v in + self.coocurence_lookup[attr][attr_val][ + trgt_attr].iteritems()] + + # Sort cell_values and chop after k and chop below threshold2 + cell_probabilities.sort(key=lambda t: t[1], reverse=True) + + for tuple in cell_probabilities: + value = tuple[0] + probability = tuple[1] + if len(cell_values) == self.dk_breakoff or \ + probability < self.threshold2: + break + cell_values.add(value) + return cell_values + + def _find_clean_domain(self, assignment, trgt_attr): + """ + This method finds the domain for each clean cell for learning + :param assignment: dictionary of values + :param trgt_attr: attribute of other column + :return: dictionary of cells values + """ + cell_probabilities = [] + + # Always have the initial value in the returned domain values unless + # it is Null + if assignment[trgt_attr] is not None: + cell_values = {(assignment[trgt_attr])} + else: + cell_values = {()} + for attr in assignment: + if attr == trgt_attr: + continue + attr_val = assignment[attr] + + if attr in self.coocurence_lookup: + if attr_val in self.coocurence_lookup[attr]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + cell_probabilities += \ + [(k, v) for + k, v + in + self.coocurence_lookup[attr][attr_val][ + trgt_attr].iteritems()] + + # get l values from the lookup exactly like in dirty where l < k + # get k-l random once from the domain + cell_probabilities.sort(key=lambda t: t[1]) + while len(cell_probabilities) > 0: # for now l = k/2 + if len(cell_values) == self.clean_breakoff/2: + break + tuple = cell_probabilities.pop() + value = tuple[0] + cell_values.add(value) + + random.shuffle(cell_probabilities) + + while len(cell_probabilities) > 0: + if len(cell_values) == self.clean_breakoff: + break + tuple = cell_probabilities.pop() + value = tuple[0] + cell_values.add(value) + return cell_values + + def _preprop(self): + """ + Preprocessing phase: create the dictionary with all the attributes. + :return: Null + """ + + # This part creates dictionary to find column index + + row_dictionary_element = self.cellvalues[1] + for cid in row_dictionary_element: + cell = row_dictionary_element[cid] + self.column_to_col_index_dict[cell.columnname] = cid + + # This part makes empty dictionary for each attribute + for col in self.column_to_col_index_dict: + self.domain_stats[col] = {} + + # This part adds other (dirty) attributes + # to the dictionary of the key attribute + for column_name_key in self.column_to_col_index_dict: + self.domain_pair_stats[column_name_key] = {} + for col2 in self.dirty_cells_attributes: + if col2 != column_name_key: + self.domain_pair_stats[column_name_key][col2] = {} + return + + def _analyze_entries(self): + """ + Analyze entries creates a dictionary with occurrences of the attributes + :return: Null + """ + # Iterate over tuples to create to dictionary + for tupleid in self.cellvalues: + # Iterate over attributes and grab counts for create dictionary + # that how for each attribute how many times we see each value + for cid in self.cellvalues[tupleid]: + cell = self.cellvalues[tupleid][cid] + col = cell.columnname + val = cell.value + if col in self.dirty_cells_attributes: + self.all_cells.append(cell) + self.cellvalues[tupleid][cid].domain = 1 + self.all_cells_temp[cell.cellid] = cell + + if val not in self.domain_stats[col]: + self.domain_stats[col][val] = 0.0 + self.domain_stats[col][val] += 1.0 + + # Iterate over target attributes and grab + # counts of values with other attributes + for col in self.domain_pair_stats: + cid = self.column_to_col_index_dict[col] + for tgt_col in self.domain_pair_stats[col]: + tgt_cid = self.column_to_col_index_dict[tgt_col] + tgt_val = self.cellvalues[tupleid][tgt_cid].value + val = self.cellvalues[tupleid][cid].value + assgn_tuple = (val, tgt_val) + if assgn_tuple not in self.domain_pair_stats[col][tgt_col]: + self.domain_pair_stats[col][tgt_col][assgn_tuple] = 0.0 + self.domain_pair_stats[col][tgt_col][assgn_tuple] += 1.0 + return + + def _generate_coocurences(self): + """ + This method creates all value of co-occurences + :return: Null + """ + for original_attribute in self.domain_pair_stats: + # For each column in the cooccurences + self.coocurence_lookup[original_attribute] = {} + # It creates a dictionary + for cooccured_attribute in \ + self.domain_pair_stats[original_attribute]: + # For second column in the cooccurences Over + # Pair of values that appeared together + # (original_attribute value , cooccured_attribute value) + for assgn_tuple in self.domain_pair_stats[ + original_attribute][ + cooccured_attribute]: + co_prob = self._compute_number_of_coocurences( + original_attribute, assgn_tuple[0], + cooccured_attribute, + assgn_tuple[1]) + + if co_prob > self.threshold1: + if assgn_tuple[0] not in \ + self.coocurence_lookup[ + original_attribute]: + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]] = {} + + if cooccured_attribute not in \ + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]]: + self.coocurence_lookup[ + original_attribute][ + assgn_tuple[0]][cooccured_attribute] = {} + + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]][ + cooccured_attribute][ + assgn_tuple[1]] = co_prob + return + + def _generate_assignments(self): + """ + Generate_assignments creates assignment for each cell with the + attribute and value of each other cell in the same row + :return: Null + """ + for cell in self.all_cells: + tplid = cell.tupleid + trgt_attr = cell.columnname + + # assignment is a dictionary for each cell copied + # the row of that cell + # with all attribute to find the cooccurance + assignment = {} + for cid in self.cellvalues[tplid]: + c = self.cellvalues[tplid][cid] + assignment[c.columnname] = c.value + self.assignments[cell] = assignment + self.attribute_to_be_pruned[cell.cellid] = trgt_attr + return + + def _find_cell_domain(self): + """ + Find_cell_domain finds the domain for each cell + :return: Null + """ + for cell in self.assignments: + # In this part we get all values for cell_index's + # attribute_to_be_pruned + + # if the cell is dirty call find domain + # else, call get negative examples (domain for clean cells) + if cell.dirty == 1: + self.cell_domain[cell.cellid] = self._find_dk_domain( + self.assignments[cell], + self.attribute_to_be_pruned[cell.cellid]) + else: + self.cell_domain[cell.cellid] = self._find_clean_domain( + self.assignments[cell], + self.attribute_to_be_pruned[cell.cellid]) + return + + def _append_possible(self, v_id, value, dataframe, cell_index, k_ij): + """ + Appends possible values to a list + :param v_id: variable id + :param value: value + :param dataframe: list + :param cell_index: index of cell + :param k_ij: domain id + """ + if value != self.all_cells_temp[cell_index].value: + dataframe.append( + [v_id, (self.all_cells_temp[cell_index].tupleid + 1), + self.all_cells_temp[cell_index].columnname, + unicode(value), 0, k_ij]) + else: + dataframe.append( + [v_id, (self.all_cells_temp[cell_index].tupleid + 1), + self.all_cells_temp[cell_index].columnname, + unicode(value), 1, k_ij]) + + def _create_dataframe(self): + """ + Creates spark dataframes from cell_domain for all the cells + :return: Null + """ + + attributes = self.dataset.get_schema('Init') + domain_dict = {} + domain_kij_clean = [] + domain_kij_dk = [] + for attribute in attributes: + if attribute != GlobalVariables.index_name: + domain_dict[attribute] = set() + + possible_values_clean = [] + possible_values_dirty = [] + self.v_id_clean_list = [] + self.v_id_dk_list = [] + v_id_clean = v_id_dk = 0 + + for tuple_id in self.cellvalues: + for cell_index in self.cellvalues[tuple_id]: + attribute = self.cellvalues[tuple_id][cell_index].columnname + value = self.cellvalues[tuple_id][cell_index].value + domain_dict[attribute].add(value) + + if self.cellvalues[tuple_id][cell_index].dirty == 1: + tmp_cell_index = self.cellvalues[tuple_id][ + cell_index].cellid + if self.cellvalues[tuple_id][cell_index].domain == 1: + k_ij = 0 + v_id_dk = v_id_dk + 1 + + self.v_id_dk_list.append([( + self.cellvalues[tuple_id][cell_index].tupleid +1), + self.cellvalues[tuple_id][cell_index].columnname, + tmp_cell_index]) + + for value in self.cell_domain[tmp_cell_index]: + if value != (): + k_ij = k_ij + 1 + + self._append_possible( + v_id_dk, + value, + possible_values_dirty, + tmp_cell_index, k_ij + ) + + domain_kij_dk.append([v_id_dk, ( + self.all_cells_temp[tmp_cell_index].tupleid + 1), + self.all_cells_temp[tmp_cell_index].columnname, + k_ij]) + else: + + tmp_cell_index = self.cellvalues[tuple_id][ + cell_index].cellid + + if self.cellvalues[tuple_id][cell_index].domain == 1: + if len(self.cell_domain[tmp_cell_index]) > 1: + k_ij = 0 + v_id_clean = v_id_clean + 1 + + self.v_id_clean_list.append([ + (self.all_cells_temp[ + tmp_cell_index + ].tupleid + 1), + self.cellvalues[tuple_id][cell_index].columnname, tmp_cell_index]) + + for value in self.cell_domain[tmp_cell_index]: + if value != 0: + k_ij = k_ij + 1 + + self._append_possible( + v_id_clean, + value, + possible_values_clean, + tmp_cell_index, k_ij) + + domain_kij_clean.append([ + v_id_clean, + (self.all_cells_temp[ + tmp_cell_index + ].tupleid + 1), + self.cellvalues[tuple_id][cell_index].columnname, k_ij]) + + self.all_cells = None + self.all_cells_temp = None + + # Create possible table + df_possible_clean = self.spark_session.createDataFrame( + possible_values_clean, self.dataset.attributes['Possible_values'] + ) + + self.dataengine.add_db_table('Possible_values_clean', + df_possible_clean, self.dataset) + self.dataengine.add_db_table_index( + self.dataset.table_specific_name('Possible_values_clean'), + 'attr_name') + + df_possible_dk = self.spark_session.createDataFrame( + possible_values_dirty, self.dataset.attributes['Possible_values']) + + self.dataengine.add_db_table('Possible_values_dk', + df_possible_dk, self.dataset) + self.dataengine.add_db_table_index( + self.dataset.table_specific_name('Possible_values_dk'), + 'attr_name') + + df_kij = self.spark_session.createDataFrame( + domain_kij_dk, self.dataset.attributes['Kij_lookup']) + self.dataengine.add_db_table('Kij_lookup_dk', + df_kij, self.dataset) + + df_kij = self.spark_session.createDataFrame( + domain_kij_clean, self.dataset.attributes['Kij_lookup']) + self.dataengine.add_db_table('Kij_lookup_clean', + df_kij, self.dataset) + + self.dataengine.holo_env.logger.info('The table: ' + + self.dataset.table_specific_name( + 'Kij_lookup_clean') + + " has been created") + self.dataengine.holo_env.logger.info(" ") + self.dataengine.holo_env.logger.info('The table: ' + + self.dataset.table_specific_name( + 'Possible_values_dk') + + " has been created") + self.dataengine.holo_env.logger.info(" ") + + create_feature_id_map = "Create TABLE " + \ + self.dataset.table_specific_name( + "Feature_id_map") + \ + "( feature_ind INT," \ + " attribute VARCHAR(255)," \ + " value VARCHAR(255)," \ + " type VARCHAR(255) );" + + self.dataengine.query(create_feature_id_map) + + query_observed = "CREATE TABLE " + \ + self.dataset.table_specific_name( + 'Observed_Possible_values_clean') + \ + " AS SELECT * FROM " + \ + self.dataset.table_specific_name( + 'Possible_values_clean') + " as t1 " + \ + " WHERE " \ + " t1.observed=1;" + + self.dataengine.query(query_observed) + + query_observed = \ + "CREATE TABLE " + \ + self.dataset.table_specific_name('Observed_Possible_values_dk') + \ + " AS SELECT * FROM " + \ + self.dataset.table_specific_name('Possible_values_dk') + \ + " as t1 WHERE " \ + "t1.observed=1;" + + self.dataengine.query(query_observed) + self.assignments = None + self.attribute_to_be_pruned = None + self.attribute_map = None + + return + + def get_domain(self): + """ + :return: + :rtype: + """ + session = self.session + + t1 = time.time() + self._preprop() + t2 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_preprop " + str(t2 - t1)) + + self._analyze_entries() + t3 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_analyze_entries " + str(t3 - t2)) + + self._generate_assignments() + t4 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info( + "_generate_assignments " + str(t4 - t3)) + + self._generate_coocurences() + t5 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info( + "_generate_coocurences " + str(t5 - t4)) + + self._find_cell_domain() + t6 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_find_cell_domain " + str(t6 - t5)) \ No newline at end of file diff --git a/holoclean/utils/pruning.py b/holoclean/utils/pruning.py index e87bef28..8f27c378 100644 --- a/holoclean/utils/pruning.py +++ b/holoclean/utils/pruning.py @@ -1,6 +1,5 @@ -import random +from abc import ABCMeta, abstractmethod from holoclean.global_variables import GlobalVariables -import time class RandomVar: @@ -9,109 +8,34 @@ class RandomVar: def __init__(self, **kwargs): """ Initializing random variable objects - :param kwargs: dictionary of properties """ self.__dict__.update(kwargs) class Pruning: - """Pruning class: Creates the domain table for all the cells""" + """ + This class is an abstract class for general pruning, it requires for + every sub-class to implement the create_dataframe method + """ + __metaclass__ = ABCMeta - def __init__(self, session, - threshold1=0.1, - threshold2=0.3, - dk_breakoff=3, - clean_breakoff=10 - ): + def __init__(self, session): """ - Initializing the pruning object - + Initializing Featurizer object abstraction :param session: session object - :param threshold1: to limit possible values for training data - :param threshold2: to limit possible values for dirty data - :param dk_breakoff: to limit possible values for dirty data to - less than k values - :param clean_breakoff: to limit possible values for training data to - less than k values """ self.session = session self.spark_session = session.holo_env.spark_session self.dataengine = session.holo_env.dataengine - self.threshold1 = threshold1 - self.threshold2 = threshold2 - self.dk_breakoff = dk_breakoff - self.clean_breakoff = clean_breakoff - self.dataset = session.dataset - self.assignments = {} - self.cell_domain_nb = {} - self.domain_stats = {} - self.domain_pair_stats = {} - self.column_to_col_index_dict = {} - self.attribute_to_be_pruned = {} + self.cellvalues = self._c_values() self.dirty_cells_attributes = set([]) - self.coocurence_lookup = {} + self._d_cell() self.cell_domain = {} - self.all_cells = [] - self.all_cells_temp = {} - self.index = 0 - - self.cellvalues = self._c_values() - self.noisycells = self._d_cell() - t1 = time.time() - self._preprop() - t2 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_preprop " + str(t2 - t1)) - - self._analyze_entries() - t3 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_analyze_entries " + str(t3 - t2)) - - self._generate_assignments() - t4 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info( - "_generate_assignments " + str(t4 - t3)) - - self._generate_coocurences() - t5 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info( - "_generate_coocurences " + str(t5 - t4)) - - self._find_cell_domain() - t6 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_find_cell_domain " + str(t6 - t5)) - - self._create_dataframe() - t7 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_create_dataframe " + str(t7 - t6)) - - # Internal Method - def _d_cell(self): - """ - Creates don't know cells list from the C_dk table - - :return: list of don't know cells - """ - dataframe_dont_know = self.session.dk_df - noisy_cells = [] - for cell in dataframe_dont_know.collect(): - cell_variable = RandomVar(columnname=cell[1], row_id=int(cell[0])) - noisy_cells.append(cell_variable) - self.cellvalues[int(cell[0]) - 1][ - self.attribute_map[cell[1]]].dirty = 1 - - return noisy_cells def _c_values(self): """ Create cell value dictionary from the init table - :return: dictionary of cells values """ dataframe_init = self.session.init_dataset @@ -140,493 +64,35 @@ def _c_values(self): row_id = row_id + 1 return cell_values - def _compute_number_of_coocurences( - self, - original_attribute, - original_attr_value, - cooccured_attribute, - cooccured_attr_value): - """ - _compute_number_of_coocurences creates conditional probabilities for - each cell with the attribute and value of each other cell in the same - row - - :param original_attribute: the name of first attribute - :param original_attr_value: the initial value of the first attribute - :param cooccured_attribute: the name of second attribute - :param cooccured_attr_value: the initial value of the second attribute - - :return: probability - """ - if (original_attr_value, cooccured_attr_value) not in \ - self.domain_pair_stats[ - original_attribute][cooccured_attribute]: - return None - - cooccur_count = \ - self.domain_pair_stats[original_attribute][cooccured_attribute][( - original_attr_value, cooccured_attr_value)] - - value_count = self.domain_stats[original_attribute][ - original_attr_value] - - # Compute counter - if original_attr_value is None or cooccured_attr_value is None: - probability = 0 - else: - probability = cooccur_count / value_count - return probability - - # We need a new function like find_domain for clean cells - # such that it does not limit the domain to the possible values - # above the threshold - # first iteration, use a low threshold (i.e. 0) and limit using k - - def _find_dk_domain(self, assignment, trgt_attr): - """ - This method finds the domain for each dirty cell for inference - - - :param assignment: the values for every attribute - :param trgt_attr: the name of attribute - - :return: dictionary of cells values - """ - # cell_probabilities will hold domain values and their probabilities - cell_probabilities = [] - # always have the initial value in the returned domain values unless - # it is null - if assignment[trgt_attr] is not None: - cell_values = {(assignment[trgt_attr])} - else: - cell_values = {()} - for attr in assignment: - if attr == trgt_attr: - continue - attr_val = assignment[attr] - - if attr in self.coocurence_lookup: - if attr_val in self.coocurence_lookup[attr]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - cell_probabilities += \ - [(k, v) for - k, v in - self.coocurence_lookup[attr][attr_val][ - trgt_attr].iteritems()] - - # Sort cell_values and chop after k and chop below threshold2 - cell_probabilities.sort(key=lambda t: t[1], reverse=True) - - for tuple in cell_probabilities: - value = tuple[0] - probability = tuple[1] - if len(cell_values) == self.dk_breakoff or \ - probability < self.threshold2: - break - cell_values.add(value) - return cell_values - - def _find_clean_domain(self, assignment, trgt_attr): - """ - This method finds the domain for each clean cell for learning - - :param assignment: dictionary of values - :param trgt_attr: attribute of other column - - :return: dictionary of cells values - """ - cell_probabilities = [] - - # Always have the initial value in the returned domain values unless - # it is Null - if assignment[trgt_attr] is not None: - cell_values = {(assignment[trgt_attr])} - else: - cell_values = {()} - for attr in assignment: - if attr == trgt_attr: - continue - attr_val = assignment[attr] - - if attr in self.coocurence_lookup: - if attr_val in self.coocurence_lookup[attr]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - cell_probabilities += \ - [(k, v) for - k, v - in - self.coocurence_lookup[attr][attr_val][ - trgt_attr].iteritems()] - - # get l values from the lookup exactly like in dirty where l < k - # get k-l random once from the domain - cell_probabilities.sort(key=lambda t: t[1]) - while len(cell_probabilities) > 0: # for now l = k/2 - if len(cell_values) == self.clean_breakoff/2: - break - tuple = cell_probabilities.pop() - value = tuple[0] - cell_values.add(value) - - random.shuffle(cell_probabilities) - - while len(cell_probabilities) > 0: - if len(cell_values) == self.clean_breakoff: - break - tuple = cell_probabilities.pop() - value = tuple[0] - cell_values.add(value) - return cell_values - - def _preprop(self): - """ - Preprocessing phase: create the dictionary with all the attributes. - - :return: Null - """ - - # This part creates dictionary to find column index - - row_dictionary_element = self.cellvalues[1] - for cid in row_dictionary_element: - cell = row_dictionary_element[cid] - self.column_to_col_index_dict[cell.columnname] = cid - - # This part gets the attributes of noisy cells - for cell in self.noisycells: - self.dirty_cells_attributes.add(cell.columnname) - - # This part makes empty dictionary for each attribute - for col in self.column_to_col_index_dict: - self.domain_stats[col] = {} - - # This part adds other (dirty) attributes - # to the dictionary of the key attribute - for column_name_key in self.column_to_col_index_dict: - self.domain_pair_stats[column_name_key] = {} - for col2 in self.dirty_cells_attributes: - if col2 != column_name_key: - self.domain_pair_stats[column_name_key][col2] = {} - return - - def _analyze_entries(self): - """ - Analyze entries creates a dictionary with occurrences of the attributes - - :return: Null - """ - # Iterate over tuples to create to dictionary - for tupleid in self.cellvalues: - # Iterate over attributes and grab counts for create dictionary - # that how for each attribute how many times we see each value - for cid in self.cellvalues[tupleid]: - cell = self.cellvalues[tupleid][cid] - col = cell.columnname - val = cell.value - if col in self.dirty_cells_attributes: - self.all_cells.append(cell) - self.cellvalues[tupleid][cid].domain = 1 - self.all_cells_temp[cell.cellid] = cell - - if val not in self.domain_stats[col]: - self.domain_stats[col][val] = 0.0 - self.domain_stats[col][val] += 1.0 - - # Iterate over target attributes and grab - # counts of values with other attributes - for col in self.domain_pair_stats: - cid = self.column_to_col_index_dict[col] - for tgt_col in self.domain_pair_stats[col]: - tgt_cid = self.column_to_col_index_dict[tgt_col] - tgt_val = self.cellvalues[tupleid][tgt_cid].value - val = self.cellvalues[tupleid][cid].value - assgn_tuple = (val, tgt_val) - if assgn_tuple not in self.domain_pair_stats[col][tgt_col]: - self.domain_pair_stats[col][tgt_col][assgn_tuple] = 0.0 - self.domain_pair_stats[col][tgt_col][assgn_tuple] += 1.0 - return - - def _generate_coocurences(self): - """ - This method creates all value of co-occurences - - :return: Null - """ - for original_attribute in self.domain_pair_stats: - # For each column in the cooccurences - self.coocurence_lookup[original_attribute] = {} - # It creates a dictionary - for cooccured_attribute in \ - self.domain_pair_stats[original_attribute]: - # For second column in the cooccurences Over - # Pair of values that appeared together - # (original_attribute value , cooccured_attribute value) - for assgn_tuple in self.domain_pair_stats[ - original_attribute][ - cooccured_attribute]: - co_prob = self._compute_number_of_coocurences( - original_attribute, assgn_tuple[0], - cooccured_attribute, - assgn_tuple[1]) - - if co_prob > self.threshold1: - if assgn_tuple[0] not in \ - self.coocurence_lookup[ - original_attribute]: - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]] = {} - - if cooccured_attribute not in \ - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]]: - self.coocurence_lookup[ - original_attribute][ - assgn_tuple[0]][cooccured_attribute] = {} - - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]][ - cooccured_attribute][ - assgn_tuple[1]] = co_prob - return - - def _generate_assignments(self): + def _d_cell(self): """ - Generate_assignments creates assignment for each cell with the - attribute and value of each other cell in the same row - - :return: Null + Finds the attributes that appear in don't know cells and flags the cells + in cellvalues that are dirty + :return: None """ - for cell in self.all_cells: - tplid = cell.tupleid - trgt_attr = cell.columnname - - # assignment is a dictionary for each cell copied - # the row of that cell - # with all attribute to find the cooccurance - assignment = {} - for cid in self.cellvalues[tplid]: - c = self.cellvalues[tplid][cid] - assignment[c.columnname] = c.value - self.assignments[cell] = assignment - self.attribute_to_be_pruned[cell.cellid] = trgt_attr - return + dataframe_dont_know = self.session.dk_df + for cell in dataframe_dont_know.collect(): - def _find_cell_domain(self): - """ - Find_cell_domain finds the domain for each cell + # This part gets the attributes of noisy cells + self.dirty_cells_attributes.add(cell[1]) - :return: Null - """ - for cell in self.assignments: - # In this part we get all values for cell_index's - # attribute_to_be_pruned + self.cellvalues[int(cell[0]) - 1][ + self.attribute_map[cell[1]]].dirty = 1 - # if the cell is dirty call find domain - # else, call get negative examples (domain for clean cells) - if cell.dirty == 1: - self.cell_domain[cell.cellid] = self._find_dk_domain( - self.assignments[cell], - self.attribute_to_be_pruned[cell.cellid]) - else: - self.cell_domain[cell.cellid] = self._find_clean_domain( - self.assignments[cell], - self.attribute_to_be_pruned[cell.cellid]) return - def _append_possible(self, v_id, value, dataframe, cell_index, k_ij): + @abstractmethod + def get_domain(self): """ - Appends possible values to a list - - :param v_id: variable id - :param value: value - :param dataframe: list - :param cell_index: index of cell - :param k_ij: domain id + This method is used to populate the cell_domain to get the possible + values for each cell """ - if value != self.all_cells_temp[cell_index].value: - dataframe.append( - [v_id, (self.all_cells_temp[cell_index].tupleid + 1), - self.all_cells_temp[cell_index].columnname, - unicode(value), 0, k_ij]) - else: - dataframe.append( - [v_id, (self.all_cells_temp[cell_index].tupleid + 1), - self.all_cells_temp[cell_index].columnname, - unicode(value), 1, k_ij]) + pass + @abstractmethod def _create_dataframe(self): """ - Creates spark dataframes from cell_domain for all the cells - - :return: Null + Creates spark dataframes from cell_domain for all the cells, then creates + the possible values tables and kij_lookup for the domain in sql """ - - attributes = self.dataset.get_schema('Init') - domain_dict = {} - domain_kij_clean = [] - domain_kij_dk = [] - for attribute in attributes: - if attribute != GlobalVariables.index_name: - domain_dict[attribute] = set() - - possible_values_clean = [] - possible_values_dirty = [] - self.v_id_clean_list = [] - self.v_id_dk_list = [] - v_id_clean = v_id_dk = 0 - - for tuple_id in self.cellvalues: - for cell_index in self.cellvalues[tuple_id]: - attribute = self.cellvalues[tuple_id][cell_index].columnname - value = self.cellvalues[tuple_id][cell_index].value - domain_dict[attribute].add(value) - - if self.cellvalues[tuple_id][cell_index].dirty == 1: - tmp_cell_index = self.cellvalues[tuple_id][ - cell_index].cellid - if self.cellvalues[tuple_id][cell_index].domain == 1: - k_ij = 0 - v_id_dk = v_id_dk + 1 - - self.v_id_dk_list.append([( - self.all_cells_temp[ - tmp_cell_index - ].tupleid + 1), - self.all_cells_temp[ - tmp_cell_index - ].columnname, - tmp_cell_index]) - - for value in self.cell_domain[tmp_cell_index]: - if value != (): - k_ij = k_ij + 1 - - self._append_possible( - v_id_dk, - value, - possible_values_dirty, - tmp_cell_index, k_ij - ) - - domain_kij_dk.append([v_id_dk, ( - self.all_cells_temp[tmp_cell_index].tupleid + 1), - self.all_cells_temp[tmp_cell_index].columnname, - k_ij]) - else: - - tmp_cell_index = self.cellvalues[tuple_id][ - cell_index].cellid - - if self.cellvalues[tuple_id][cell_index].domain == 1: - if len(self.cell_domain[tmp_cell_index]) > 1: - k_ij = 0 - v_id_clean = v_id_clean + 1 - - self.v_id_clean_list.append([ - (self.all_cells_temp[ - tmp_cell_index - ].tupleid + 1), - self.all_cells_temp[ - tmp_cell_index - ].columnname, tmp_cell_index]) - - for value in self.cell_domain[tmp_cell_index]: - if value != 0: - k_ij = k_ij + 1 - - self._append_possible( - v_id_clean, - value, - possible_values_clean, - tmp_cell_index, k_ij) - - domain_kij_clean.append([ - v_id_clean, - (self.all_cells_temp[ - tmp_cell_index - ].tupleid + 1), - self.all_cells_temp[ - tmp_cell_index - ].columnname, k_ij]) - - self.all_cells = None - self.all_cells_temp = None - - # Create possible table - df_possible_clean = self.spark_session.createDataFrame( - possible_values_clean, self.dataset.attributes['Possible_values'] - ) - - self.dataengine.add_db_table('Possible_values_clean', - df_possible_clean, self.dataset) - self.dataengine.add_db_table_index( - self.dataset.table_specific_name('Possible_values_clean'), - 'attr_name') - - df_possible_dk = self.spark_session.createDataFrame( - possible_values_dirty, self.dataset.attributes['Possible_values']) - - self.dataengine.add_db_table('Possible_values_dk', - df_possible_dk, self.dataset) - self.dataengine.add_db_table_index( - self.dataset.table_specific_name('Possible_values_dk'), - 'attr_name') - - df_kij = self.spark_session.createDataFrame( - domain_kij_dk, self.dataset.attributes['Kij_lookup']) - self.dataengine.add_db_table('Kij_lookup_dk', - df_kij, self.dataset) - - df_kij = self.spark_session.createDataFrame( - domain_kij_clean, self.dataset.attributes['Kij_lookup']) - self.dataengine.add_db_table('Kij_lookup_clean', - df_kij, self.dataset) - - self.dataengine.holo_env.logger.info('The table: ' + - self.dataset.table_specific_name( - 'Kij_lookup_clean') + - " has been created") - self.dataengine.holo_env.logger.info(" ") - self.dataengine.holo_env.logger.info('The table: ' + - self.dataset.table_specific_name( - 'Possible_values_dk') + - " has been created") - self.dataengine.holo_env.logger.info(" ") - - create_feature_id_map = "Create TABLE " + \ - self.dataset.table_specific_name( - "Feature_id_map") + \ - "( feature_ind INT," \ - " attribute VARCHAR(255)," \ - " value VARCHAR(255)," \ - " type VARCHAR(255) );" - - self.dataengine.query(create_feature_id_map) - - query_observed = "CREATE TABLE " + \ - self.dataset.table_specific_name( - 'Observed_Possible_values_clean') + \ - " AS SELECT * FROM " + \ - self.dataset.table_specific_name( - 'Possible_values_clean') + " as t1 " + \ - " WHERE " \ - " t1.observed=1;" - - self.dataengine.query(query_observed) - - query_observed = \ - "CREATE TABLE " + \ - self.dataset.table_specific_name('Observed_Possible_values_dk') + \ - " AS SELECT * FROM " + \ - self.dataset.table_specific_name('Possible_values_dk') + \ - " as t1 WHERE " \ - "t1.observed=1;" - - self.dataengine.query(query_observed) - self.assignments = None - self.attribute_to_be_pruned = None - self.attribute_map = None - - return + pass \ No newline at end of file From 777b79bbf9db45597088ce88bc2b391320a14868 Mon Sep 17 00:00:00 2001 From: gmichalo Date: Sun, 29 Apr 2018 21:39:21 -0400 Subject: [PATCH 4/8] forked repo --- holoclean/holoclean.py | 8 +- holoclean/utils/cooccurpruning.py | 541 --------------------------- holoclean/utils/pruning.py | 592 ++++++++++++++++++++++++++++-- 3 files changed, 565 insertions(+), 576 deletions(-) delete mode 100644 holoclean/utils/cooccurpruning.py diff --git a/holoclean/holoclean.py b/holoclean/holoclean.py index 5635fdee..da2403d8 100644 --- a/holoclean/holoclean.py +++ b/holoclean/holoclean.py @@ -11,7 +11,7 @@ from dataengine import DataEngine from dataset import Dataset from featurization.database_worker import DatabaseWorker, PopulateTensor -from utils.cooccurpruning import CooccurPruning +from utils.pruning import Pruning from utils.parser_interface import ParserInterface, DenialConstraint import multiprocessing @@ -650,14 +650,10 @@ def _ds_domain_pruning(self, pruning_threshold1, pruning_threshold2, 'starting domain pruning with threshold %s', pruning_threshold1) - self.pruning = CooccurPruning( + self.pruning = Pruning( self, pruning_threshold1, pruning_threshold2, pruning_dk_breakoff, pruning_clean_breakoff) - - self.pruning.get_domain() - - self.pruning._create_dataframe() self.holo_env.logger.info('Domain pruning is finished :') return diff --git a/holoclean/utils/cooccurpruning.py b/holoclean/utils/cooccurpruning.py deleted file mode 100644 index 859e06eb..00000000 --- a/holoclean/utils/cooccurpruning.py +++ /dev/null @@ -1,541 +0,0 @@ -import random -from holoclean.global_variables import GlobalVariables -from pruning import Pruning, RandomVar -import time - -class CooccurPruning(Pruning): - """Pruning class: Creates the domain table for all the cells""" - - def __init__(self, session, - threshold1=0.1, - threshold2=0.3, - dk_breakoff=3, - clean_breakoff=10 - ): - """ - Initializing the pruning object - :param session: session object - :param threshold1: to limit possible values for training data - :param threshold2: to limit possible values for dirty data - :param dk_breakoff: to limit possible values for dirty data to - less than k values - :param clean_breakoff: to limit possible values for training data to - less than k values - """ - super(CooccurPruning, self).__init__(session) - self.threshold1 = threshold1 - self.threshold2 = threshold2 - self.dk_breakoff = dk_breakoff - self.clean_breakoff = clean_breakoff - self.dataset = session.dataset - self.assignments = {} - self.cell_domain_nb = {} - self.domain_stats = {} - self.domain_pair_stats = {} - self.column_to_col_index_dict = {} - self.attribute_to_be_pruned = {} - self.coocurence_lookup = {} - self.all_cells = [] - self.all_cells_temp = {} - self.index = 0 - - # Internal Method - def _compute_number_of_coocurences( - self, - original_attribute, - original_attr_value, - cooccured_attribute, - cooccured_attr_value): - """ - _compute_number_of_coocurences creates conditional probabilities for - each cell with the attribute and value of each other cell in the same - row - :param original_attribute: the name of first attribute - :param original_attr_value: the initial value of the first attribute - :param cooccured_attribute: the name of second attribute - :param cooccured_attr_value: the initial value of the second attribute - :return: probability - """ - if (original_attr_value, cooccured_attr_value) not in \ - self.domain_pair_stats[ - original_attribute][cooccured_attribute]: - return None - - cooccur_count = \ - self.domain_pair_stats[original_attribute][cooccured_attribute][( - original_attr_value, cooccured_attr_value)] - - value_count = self.domain_stats[original_attribute][ - original_attr_value] - - # Compute counter - if original_attr_value is None or cooccured_attr_value is None: - probability = 0 - else: - probability = cooccur_count / value_count - return probability - - # We need a new function like find_domain for clean cells - # such that it does not limit the domain to the possible values - # above the threshold - # first iteration, use a low threshold (i.e. 0) and limit using k - - def _find_dk_domain(self, assignment, trgt_attr): - """ - This method finds the domain for each dirty cell for inference - :param assignment: the values for every attribute - :param trgt_attr: the name of attribute - :return: dictionary of cells values - """ - # cell_probabilities will hold domain values and their probabilities - cell_probabilities = [] - # always have the initial value in the returned domain values unless - # it is null - if assignment[trgt_attr] is not None: - cell_values = {(assignment[trgt_attr])} - else: - cell_values = {()} - for attr in assignment: - if attr == trgt_attr: - continue - attr_val = assignment[attr] - - if attr in self.coocurence_lookup: - if attr_val in self.coocurence_lookup[attr]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - cell_probabilities += \ - [(k, v) for - k, v in - self.coocurence_lookup[attr][attr_val][ - trgt_attr].iteritems()] - - # Sort cell_values and chop after k and chop below threshold2 - cell_probabilities.sort(key=lambda t: t[1], reverse=True) - - for tuple in cell_probabilities: - value = tuple[0] - probability = tuple[1] - if len(cell_values) == self.dk_breakoff or \ - probability < self.threshold2: - break - cell_values.add(value) - return cell_values - - def _find_clean_domain(self, assignment, trgt_attr): - """ - This method finds the domain for each clean cell for learning - :param assignment: dictionary of values - :param trgt_attr: attribute of other column - :return: dictionary of cells values - """ - cell_probabilities = [] - - # Always have the initial value in the returned domain values unless - # it is Null - if assignment[trgt_attr] is not None: - cell_values = {(assignment[trgt_attr])} - else: - cell_values = {()} - for attr in assignment: - if attr == trgt_attr: - continue - attr_val = assignment[attr] - - if attr in self.coocurence_lookup: - if attr_val in self.coocurence_lookup[attr]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - if trgt_attr in self.coocurence_lookup[attr][attr_val]: - cell_probabilities += \ - [(k, v) for - k, v - in - self.coocurence_lookup[attr][attr_val][ - trgt_attr].iteritems()] - - # get l values from the lookup exactly like in dirty where l < k - # get k-l random once from the domain - cell_probabilities.sort(key=lambda t: t[1]) - while len(cell_probabilities) > 0: # for now l = k/2 - if len(cell_values) == self.clean_breakoff/2: - break - tuple = cell_probabilities.pop() - value = tuple[0] - cell_values.add(value) - - random.shuffle(cell_probabilities) - - while len(cell_probabilities) > 0: - if len(cell_values) == self.clean_breakoff: - break - tuple = cell_probabilities.pop() - value = tuple[0] - cell_values.add(value) - return cell_values - - def _preprop(self): - """ - Preprocessing phase: create the dictionary with all the attributes. - :return: Null - """ - - # This part creates dictionary to find column index - - row_dictionary_element = self.cellvalues[1] - for cid in row_dictionary_element: - cell = row_dictionary_element[cid] - self.column_to_col_index_dict[cell.columnname] = cid - - # This part makes empty dictionary for each attribute - for col in self.column_to_col_index_dict: - self.domain_stats[col] = {} - - # This part adds other (dirty) attributes - # to the dictionary of the key attribute - for column_name_key in self.column_to_col_index_dict: - self.domain_pair_stats[column_name_key] = {} - for col2 in self.dirty_cells_attributes: - if col2 != column_name_key: - self.domain_pair_stats[column_name_key][col2] = {} - return - - def _analyze_entries(self): - """ - Analyze entries creates a dictionary with occurrences of the attributes - :return: Null - """ - # Iterate over tuples to create to dictionary - for tupleid in self.cellvalues: - # Iterate over attributes and grab counts for create dictionary - # that how for each attribute how many times we see each value - for cid in self.cellvalues[tupleid]: - cell = self.cellvalues[tupleid][cid] - col = cell.columnname - val = cell.value - if col in self.dirty_cells_attributes: - self.all_cells.append(cell) - self.cellvalues[tupleid][cid].domain = 1 - self.all_cells_temp[cell.cellid] = cell - - if val not in self.domain_stats[col]: - self.domain_stats[col][val] = 0.0 - self.domain_stats[col][val] += 1.0 - - # Iterate over target attributes and grab - # counts of values with other attributes - for col in self.domain_pair_stats: - cid = self.column_to_col_index_dict[col] - for tgt_col in self.domain_pair_stats[col]: - tgt_cid = self.column_to_col_index_dict[tgt_col] - tgt_val = self.cellvalues[tupleid][tgt_cid].value - val = self.cellvalues[tupleid][cid].value - assgn_tuple = (val, tgt_val) - if assgn_tuple not in self.domain_pair_stats[col][tgt_col]: - self.domain_pair_stats[col][tgt_col][assgn_tuple] = 0.0 - self.domain_pair_stats[col][tgt_col][assgn_tuple] += 1.0 - return - - def _generate_coocurences(self): - """ - This method creates all value of co-occurences - :return: Null - """ - for original_attribute in self.domain_pair_stats: - # For each column in the cooccurences - self.coocurence_lookup[original_attribute] = {} - # It creates a dictionary - for cooccured_attribute in \ - self.domain_pair_stats[original_attribute]: - # For second column in the cooccurences Over - # Pair of values that appeared together - # (original_attribute value , cooccured_attribute value) - for assgn_tuple in self.domain_pair_stats[ - original_attribute][ - cooccured_attribute]: - co_prob = self._compute_number_of_coocurences( - original_attribute, assgn_tuple[0], - cooccured_attribute, - assgn_tuple[1]) - - if co_prob > self.threshold1: - if assgn_tuple[0] not in \ - self.coocurence_lookup[ - original_attribute]: - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]] = {} - - if cooccured_attribute not in \ - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]]: - self.coocurence_lookup[ - original_attribute][ - assgn_tuple[0]][cooccured_attribute] = {} - - self.coocurence_lookup[ - original_attribute][assgn_tuple[0]][ - cooccured_attribute][ - assgn_tuple[1]] = co_prob - return - - def _generate_assignments(self): - """ - Generate_assignments creates assignment for each cell with the - attribute and value of each other cell in the same row - :return: Null - """ - for cell in self.all_cells: - tplid = cell.tupleid - trgt_attr = cell.columnname - - # assignment is a dictionary for each cell copied - # the row of that cell - # with all attribute to find the cooccurance - assignment = {} - for cid in self.cellvalues[tplid]: - c = self.cellvalues[tplid][cid] - assignment[c.columnname] = c.value - self.assignments[cell] = assignment - self.attribute_to_be_pruned[cell.cellid] = trgt_attr - return - - def _find_cell_domain(self): - """ - Find_cell_domain finds the domain for each cell - :return: Null - """ - for cell in self.assignments: - # In this part we get all values for cell_index's - # attribute_to_be_pruned - - # if the cell is dirty call find domain - # else, call get negative examples (domain for clean cells) - if cell.dirty == 1: - self.cell_domain[cell.cellid] = self._find_dk_domain( - self.assignments[cell], - self.attribute_to_be_pruned[cell.cellid]) - else: - self.cell_domain[cell.cellid] = self._find_clean_domain( - self.assignments[cell], - self.attribute_to_be_pruned[cell.cellid]) - return - - def _append_possible(self, v_id, value, dataframe, cell_index, k_ij): - """ - Appends possible values to a list - :param v_id: variable id - :param value: value - :param dataframe: list - :param cell_index: index of cell - :param k_ij: domain id - """ - if value != self.all_cells_temp[cell_index].value: - dataframe.append( - [v_id, (self.all_cells_temp[cell_index].tupleid + 1), - self.all_cells_temp[cell_index].columnname, - unicode(value), 0, k_ij]) - else: - dataframe.append( - [v_id, (self.all_cells_temp[cell_index].tupleid + 1), - self.all_cells_temp[cell_index].columnname, - unicode(value), 1, k_ij]) - - def _create_dataframe(self): - """ - Creates spark dataframes from cell_domain for all the cells - :return: Null - """ - - attributes = self.dataset.get_schema('Init') - domain_dict = {} - domain_kij_clean = [] - domain_kij_dk = [] - for attribute in attributes: - if attribute != GlobalVariables.index_name: - domain_dict[attribute] = set() - - possible_values_clean = [] - possible_values_dirty = [] - self.v_id_clean_list = [] - self.v_id_dk_list = [] - v_id_clean = v_id_dk = 0 - - for tuple_id in self.cellvalues: - for cell_index in self.cellvalues[tuple_id]: - attribute = self.cellvalues[tuple_id][cell_index].columnname - value = self.cellvalues[tuple_id][cell_index].value - domain_dict[attribute].add(value) - - if self.cellvalues[tuple_id][cell_index].dirty == 1: - tmp_cell_index = self.cellvalues[tuple_id][ - cell_index].cellid - if self.cellvalues[tuple_id][cell_index].domain == 1: - k_ij = 0 - v_id_dk = v_id_dk + 1 - - self.v_id_dk_list.append([( - self.cellvalues[tuple_id][cell_index].tupleid +1), - self.cellvalues[tuple_id][cell_index].columnname, - tmp_cell_index]) - - for value in self.cell_domain[tmp_cell_index]: - if value != (): - k_ij = k_ij + 1 - - self._append_possible( - v_id_dk, - value, - possible_values_dirty, - tmp_cell_index, k_ij - ) - - domain_kij_dk.append([v_id_dk, ( - self.all_cells_temp[tmp_cell_index].tupleid + 1), - self.all_cells_temp[tmp_cell_index].columnname, - k_ij]) - else: - - tmp_cell_index = self.cellvalues[tuple_id][ - cell_index].cellid - - if self.cellvalues[tuple_id][cell_index].domain == 1: - if len(self.cell_domain[tmp_cell_index]) > 1: - k_ij = 0 - v_id_clean = v_id_clean + 1 - - self.v_id_clean_list.append([ - (self.all_cells_temp[ - tmp_cell_index - ].tupleid + 1), - self.cellvalues[tuple_id][cell_index].columnname, tmp_cell_index]) - - for value in self.cell_domain[tmp_cell_index]: - if value != 0: - k_ij = k_ij + 1 - - self._append_possible( - v_id_clean, - value, - possible_values_clean, - tmp_cell_index, k_ij) - - domain_kij_clean.append([ - v_id_clean, - (self.all_cells_temp[ - tmp_cell_index - ].tupleid + 1), - self.cellvalues[tuple_id][cell_index].columnname, k_ij]) - - self.all_cells = None - self.all_cells_temp = None - - # Create possible table - df_possible_clean = self.spark_session.createDataFrame( - possible_values_clean, self.dataset.attributes['Possible_values'] - ) - - self.dataengine.add_db_table('Possible_values_clean', - df_possible_clean, self.dataset) - self.dataengine.add_db_table_index( - self.dataset.table_specific_name('Possible_values_clean'), - 'attr_name') - - df_possible_dk = self.spark_session.createDataFrame( - possible_values_dirty, self.dataset.attributes['Possible_values']) - - self.dataengine.add_db_table('Possible_values_dk', - df_possible_dk, self.dataset) - self.dataengine.add_db_table_index( - self.dataset.table_specific_name('Possible_values_dk'), - 'attr_name') - - df_kij = self.spark_session.createDataFrame( - domain_kij_dk, self.dataset.attributes['Kij_lookup']) - self.dataengine.add_db_table('Kij_lookup_dk', - df_kij, self.dataset) - - df_kij = self.spark_session.createDataFrame( - domain_kij_clean, self.dataset.attributes['Kij_lookup']) - self.dataengine.add_db_table('Kij_lookup_clean', - df_kij, self.dataset) - - self.dataengine.holo_env.logger.info('The table: ' + - self.dataset.table_specific_name( - 'Kij_lookup_clean') + - " has been created") - self.dataengine.holo_env.logger.info(" ") - self.dataengine.holo_env.logger.info('The table: ' + - self.dataset.table_specific_name( - 'Possible_values_dk') + - " has been created") - self.dataengine.holo_env.logger.info(" ") - - create_feature_id_map = "Create TABLE " + \ - self.dataset.table_specific_name( - "Feature_id_map") + \ - "( feature_ind INT," \ - " attribute VARCHAR(255)," \ - " value VARCHAR(255)," \ - " type VARCHAR(255) );" - - self.dataengine.query(create_feature_id_map) - - query_observed = "CREATE TABLE " + \ - self.dataset.table_specific_name( - 'Observed_Possible_values_clean') + \ - " AS SELECT * FROM " + \ - self.dataset.table_specific_name( - 'Possible_values_clean') + " as t1 " + \ - " WHERE " \ - " t1.observed=1;" - - self.dataengine.query(query_observed) - - query_observed = \ - "CREATE TABLE " + \ - self.dataset.table_specific_name('Observed_Possible_values_dk') + \ - " AS SELECT * FROM " + \ - self.dataset.table_specific_name('Possible_values_dk') + \ - " as t1 WHERE " \ - "t1.observed=1;" - - self.dataengine.query(query_observed) - self.assignments = None - self.attribute_to_be_pruned = None - self.attribute_map = None - - return - - def get_domain(self): - """ - :return: - :rtype: - """ - session = self.session - - t1 = time.time() - self._preprop() - t2 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_preprop " + str(t2 - t1)) - - self._analyze_entries() - t3 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_analyze_entries " + str(t3 - t2)) - - self._generate_assignments() - t4 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info( - "_generate_assignments " + str(t4 - t3)) - - self._generate_coocurences() - t5 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info( - "_generate_coocurences " + str(t5 - t4)) - - self._find_cell_domain() - t6 = time.time() - if session.holo_env.verbose: - session.holo_env.logger.info("_find_cell_domain " + str(t6 - t5)) \ No newline at end of file diff --git a/holoclean/utils/pruning.py b/holoclean/utils/pruning.py index 8f27c378..e87bef28 100644 --- a/holoclean/utils/pruning.py +++ b/holoclean/utils/pruning.py @@ -1,5 +1,6 @@ -from abc import ABCMeta, abstractmethod +import random from holoclean.global_variables import GlobalVariables +import time class RandomVar: @@ -8,34 +9,109 @@ class RandomVar: def __init__(self, **kwargs): """ Initializing random variable objects + :param kwargs: dictionary of properties """ self.__dict__.update(kwargs) class Pruning: - """ - This class is an abstract class for general pruning, it requires for - every sub-class to implement the create_dataframe method - """ - __metaclass__ = ABCMeta + """Pruning class: Creates the domain table for all the cells""" - def __init__(self, session): + def __init__(self, session, + threshold1=0.1, + threshold2=0.3, + dk_breakoff=3, + clean_breakoff=10 + ): """ - Initializing Featurizer object abstraction + Initializing the pruning object + :param session: session object + :param threshold1: to limit possible values for training data + :param threshold2: to limit possible values for dirty data + :param dk_breakoff: to limit possible values for dirty data to + less than k values + :param clean_breakoff: to limit possible values for training data to + less than k values """ self.session = session self.spark_session = session.holo_env.spark_session self.dataengine = session.holo_env.dataengine - self.cellvalues = self._c_values() + self.threshold1 = threshold1 + self.threshold2 = threshold2 + self.dk_breakoff = dk_breakoff + self.clean_breakoff = clean_breakoff + self.dataset = session.dataset + self.assignments = {} + self.cell_domain_nb = {} + self.domain_stats = {} + self.domain_pair_stats = {} + self.column_to_col_index_dict = {} + self.attribute_to_be_pruned = {} self.dirty_cells_attributes = set([]) - self._d_cell() + self.coocurence_lookup = {} self.cell_domain = {} + self.all_cells = [] + self.all_cells_temp = {} + self.index = 0 + + self.cellvalues = self._c_values() + self.noisycells = self._d_cell() + t1 = time.time() + self._preprop() + t2 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_preprop " + str(t2 - t1)) + + self._analyze_entries() + t3 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_analyze_entries " + str(t3 - t2)) + + self._generate_assignments() + t4 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info( + "_generate_assignments " + str(t4 - t3)) + + self._generate_coocurences() + t5 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info( + "_generate_coocurences " + str(t5 - t4)) + + self._find_cell_domain() + t6 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_find_cell_domain " + str(t6 - t5)) + + self._create_dataframe() + t7 = time.time() + if session.holo_env.verbose: + session.holo_env.logger.info("_create_dataframe " + str(t7 - t6)) + + # Internal Method + def _d_cell(self): + """ + Creates don't know cells list from the C_dk table + + :return: list of don't know cells + """ + dataframe_dont_know = self.session.dk_df + noisy_cells = [] + for cell in dataframe_dont_know.collect(): + cell_variable = RandomVar(columnname=cell[1], row_id=int(cell[0])) + noisy_cells.append(cell_variable) + self.cellvalues[int(cell[0]) - 1][ + self.attribute_map[cell[1]]].dirty = 1 + + return noisy_cells def _c_values(self): """ Create cell value dictionary from the init table + :return: dictionary of cells values """ dataframe_init = self.session.init_dataset @@ -64,35 +140,493 @@ def _c_values(self): row_id = row_id + 1 return cell_values - def _d_cell(self): + def _compute_number_of_coocurences( + self, + original_attribute, + original_attr_value, + cooccured_attribute, + cooccured_attr_value): """ - Finds the attributes that appear in don't know cells and flags the cells - in cellvalues that are dirty - :return: None + _compute_number_of_coocurences creates conditional probabilities for + each cell with the attribute and value of each other cell in the same + row + + :param original_attribute: the name of first attribute + :param original_attr_value: the initial value of the first attribute + :param cooccured_attribute: the name of second attribute + :param cooccured_attr_value: the initial value of the second attribute + + :return: probability """ - dataframe_dont_know = self.session.dk_df - for cell in dataframe_dont_know.collect(): + if (original_attr_value, cooccured_attr_value) not in \ + self.domain_pair_stats[ + original_attribute][cooccured_attribute]: + return None - # This part gets the attributes of noisy cells - self.dirty_cells_attributes.add(cell[1]) + cooccur_count = \ + self.domain_pair_stats[original_attribute][cooccured_attribute][( + original_attr_value, cooccured_attr_value)] - self.cellvalues[int(cell[0]) - 1][ - self.attribute_map[cell[1]]].dirty = 1 + value_count = self.domain_stats[original_attribute][ + original_attr_value] + + # Compute counter + if original_attr_value is None or cooccured_attr_value is None: + probability = 0 + else: + probability = cooccur_count / value_count + return probability + + # We need a new function like find_domain for clean cells + # such that it does not limit the domain to the possible values + # above the threshold + # first iteration, use a low threshold (i.e. 0) and limit using k + + def _find_dk_domain(self, assignment, trgt_attr): + """ + This method finds the domain for each dirty cell for inference + + + :param assignment: the values for every attribute + :param trgt_attr: the name of attribute + + :return: dictionary of cells values + """ + # cell_probabilities will hold domain values and their probabilities + cell_probabilities = [] + # always have the initial value in the returned domain values unless + # it is null + if assignment[trgt_attr] is not None: + cell_values = {(assignment[trgt_attr])} + else: + cell_values = {()} + for attr in assignment: + if attr == trgt_attr: + continue + attr_val = assignment[attr] + + if attr in self.coocurence_lookup: + if attr_val in self.coocurence_lookup[attr]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + cell_probabilities += \ + [(k, v) for + k, v in + self.coocurence_lookup[attr][attr_val][ + trgt_attr].iteritems()] + + # Sort cell_values and chop after k and chop below threshold2 + cell_probabilities.sort(key=lambda t: t[1], reverse=True) + + for tuple in cell_probabilities: + value = tuple[0] + probability = tuple[1] + if len(cell_values) == self.dk_breakoff or \ + probability < self.threshold2: + break + cell_values.add(value) + return cell_values + + def _find_clean_domain(self, assignment, trgt_attr): + """ + This method finds the domain for each clean cell for learning + + :param assignment: dictionary of values + :param trgt_attr: attribute of other column + + :return: dictionary of cells values + """ + cell_probabilities = [] + + # Always have the initial value in the returned domain values unless + # it is Null + if assignment[trgt_attr] is not None: + cell_values = {(assignment[trgt_attr])} + else: + cell_values = {()} + for attr in assignment: + if attr == trgt_attr: + continue + attr_val = assignment[attr] + + if attr in self.coocurence_lookup: + if attr_val in self.coocurence_lookup[attr]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + if trgt_attr in self.coocurence_lookup[attr][attr_val]: + cell_probabilities += \ + [(k, v) for + k, v + in + self.coocurence_lookup[attr][attr_val][ + trgt_attr].iteritems()] + + # get l values from the lookup exactly like in dirty where l < k + # get k-l random once from the domain + cell_probabilities.sort(key=lambda t: t[1]) + while len(cell_probabilities) > 0: # for now l = k/2 + if len(cell_values) == self.clean_breakoff/2: + break + tuple = cell_probabilities.pop() + value = tuple[0] + cell_values.add(value) + + random.shuffle(cell_probabilities) + + while len(cell_probabilities) > 0: + if len(cell_values) == self.clean_breakoff: + break + tuple = cell_probabilities.pop() + value = tuple[0] + cell_values.add(value) + return cell_values + + def _preprop(self): + """ + Preprocessing phase: create the dictionary with all the attributes. + + :return: Null + """ + + # This part creates dictionary to find column index + + row_dictionary_element = self.cellvalues[1] + for cid in row_dictionary_element: + cell = row_dictionary_element[cid] + self.column_to_col_index_dict[cell.columnname] = cid + + # This part gets the attributes of noisy cells + for cell in self.noisycells: + self.dirty_cells_attributes.add(cell.columnname) + # This part makes empty dictionary for each attribute + for col in self.column_to_col_index_dict: + self.domain_stats[col] = {} + + # This part adds other (dirty) attributes + # to the dictionary of the key attribute + for column_name_key in self.column_to_col_index_dict: + self.domain_pair_stats[column_name_key] = {} + for col2 in self.dirty_cells_attributes: + if col2 != column_name_key: + self.domain_pair_stats[column_name_key][col2] = {} + return + + def _analyze_entries(self): + """ + Analyze entries creates a dictionary with occurrences of the attributes + + :return: Null + """ + # Iterate over tuples to create to dictionary + for tupleid in self.cellvalues: + # Iterate over attributes and grab counts for create dictionary + # that how for each attribute how many times we see each value + for cid in self.cellvalues[tupleid]: + cell = self.cellvalues[tupleid][cid] + col = cell.columnname + val = cell.value + if col in self.dirty_cells_attributes: + self.all_cells.append(cell) + self.cellvalues[tupleid][cid].domain = 1 + self.all_cells_temp[cell.cellid] = cell + + if val not in self.domain_stats[col]: + self.domain_stats[col][val] = 0.0 + self.domain_stats[col][val] += 1.0 + + # Iterate over target attributes and grab + # counts of values with other attributes + for col in self.domain_pair_stats: + cid = self.column_to_col_index_dict[col] + for tgt_col in self.domain_pair_stats[col]: + tgt_cid = self.column_to_col_index_dict[tgt_col] + tgt_val = self.cellvalues[tupleid][tgt_cid].value + val = self.cellvalues[tupleid][cid].value + assgn_tuple = (val, tgt_val) + if assgn_tuple not in self.domain_pair_stats[col][tgt_col]: + self.domain_pair_stats[col][tgt_col][assgn_tuple] = 0.0 + self.domain_pair_stats[col][tgt_col][assgn_tuple] += 1.0 + return + + def _generate_coocurences(self): + """ + This method creates all value of co-occurences + + :return: Null + """ + for original_attribute in self.domain_pair_stats: + # For each column in the cooccurences + self.coocurence_lookup[original_attribute] = {} + # It creates a dictionary + for cooccured_attribute in \ + self.domain_pair_stats[original_attribute]: + # For second column in the cooccurences Over + # Pair of values that appeared together + # (original_attribute value , cooccured_attribute value) + for assgn_tuple in self.domain_pair_stats[ + original_attribute][ + cooccured_attribute]: + co_prob = self._compute_number_of_coocurences( + original_attribute, assgn_tuple[0], + cooccured_attribute, + assgn_tuple[1]) + + if co_prob > self.threshold1: + if assgn_tuple[0] not in \ + self.coocurence_lookup[ + original_attribute]: + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]] = {} + + if cooccured_attribute not in \ + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]]: + self.coocurence_lookup[ + original_attribute][ + assgn_tuple[0]][cooccured_attribute] = {} + + self.coocurence_lookup[ + original_attribute][assgn_tuple[0]][ + cooccured_attribute][ + assgn_tuple[1]] = co_prob + return + + def _generate_assignments(self): + """ + Generate_assignments creates assignment for each cell with the + attribute and value of each other cell in the same row + + :return: Null + """ + for cell in self.all_cells: + tplid = cell.tupleid + trgt_attr = cell.columnname + + # assignment is a dictionary for each cell copied + # the row of that cell + # with all attribute to find the cooccurance + assignment = {} + for cid in self.cellvalues[tplid]: + c = self.cellvalues[tplid][cid] + assignment[c.columnname] = c.value + self.assignments[cell] = assignment + self.attribute_to_be_pruned[cell.cellid] = trgt_attr return - @abstractmethod - def get_domain(self): + def _find_cell_domain(self): """ - This method is used to populate the cell_domain to get the possible - values for each cell + Find_cell_domain finds the domain for each cell + + :return: Null """ - pass + for cell in self.assignments: + # In this part we get all values for cell_index's + # attribute_to_be_pruned + + # if the cell is dirty call find domain + # else, call get negative examples (domain for clean cells) + if cell.dirty == 1: + self.cell_domain[cell.cellid] = self._find_dk_domain( + self.assignments[cell], + self.attribute_to_be_pruned[cell.cellid]) + else: + self.cell_domain[cell.cellid] = self._find_clean_domain( + self.assignments[cell], + self.attribute_to_be_pruned[cell.cellid]) + return + + def _append_possible(self, v_id, value, dataframe, cell_index, k_ij): + """ + Appends possible values to a list + + :param v_id: variable id + :param value: value + :param dataframe: list + :param cell_index: index of cell + :param k_ij: domain id + """ + if value != self.all_cells_temp[cell_index].value: + dataframe.append( + [v_id, (self.all_cells_temp[cell_index].tupleid + 1), + self.all_cells_temp[cell_index].columnname, + unicode(value), 0, k_ij]) + else: + dataframe.append( + [v_id, (self.all_cells_temp[cell_index].tupleid + 1), + self.all_cells_temp[cell_index].columnname, + unicode(value), 1, k_ij]) - @abstractmethod def _create_dataframe(self): """ - Creates spark dataframes from cell_domain for all the cells, then creates - the possible values tables and kij_lookup for the domain in sql + Creates spark dataframes from cell_domain for all the cells + + :return: Null """ - pass \ No newline at end of file + + attributes = self.dataset.get_schema('Init') + domain_dict = {} + domain_kij_clean = [] + domain_kij_dk = [] + for attribute in attributes: + if attribute != GlobalVariables.index_name: + domain_dict[attribute] = set() + + possible_values_clean = [] + possible_values_dirty = [] + self.v_id_clean_list = [] + self.v_id_dk_list = [] + v_id_clean = v_id_dk = 0 + + for tuple_id in self.cellvalues: + for cell_index in self.cellvalues[tuple_id]: + attribute = self.cellvalues[tuple_id][cell_index].columnname + value = self.cellvalues[tuple_id][cell_index].value + domain_dict[attribute].add(value) + + if self.cellvalues[tuple_id][cell_index].dirty == 1: + tmp_cell_index = self.cellvalues[tuple_id][ + cell_index].cellid + if self.cellvalues[tuple_id][cell_index].domain == 1: + k_ij = 0 + v_id_dk = v_id_dk + 1 + + self.v_id_dk_list.append([( + self.all_cells_temp[ + tmp_cell_index + ].tupleid + 1), + self.all_cells_temp[ + tmp_cell_index + ].columnname, + tmp_cell_index]) + + for value in self.cell_domain[tmp_cell_index]: + if value != (): + k_ij = k_ij + 1 + + self._append_possible( + v_id_dk, + value, + possible_values_dirty, + tmp_cell_index, k_ij + ) + + domain_kij_dk.append([v_id_dk, ( + self.all_cells_temp[tmp_cell_index].tupleid + 1), + self.all_cells_temp[tmp_cell_index].columnname, + k_ij]) + else: + + tmp_cell_index = self.cellvalues[tuple_id][ + cell_index].cellid + + if self.cellvalues[tuple_id][cell_index].domain == 1: + if len(self.cell_domain[tmp_cell_index]) > 1: + k_ij = 0 + v_id_clean = v_id_clean + 1 + + self.v_id_clean_list.append([ + (self.all_cells_temp[ + tmp_cell_index + ].tupleid + 1), + self.all_cells_temp[ + tmp_cell_index + ].columnname, tmp_cell_index]) + + for value in self.cell_domain[tmp_cell_index]: + if value != 0: + k_ij = k_ij + 1 + + self._append_possible( + v_id_clean, + value, + possible_values_clean, + tmp_cell_index, k_ij) + + domain_kij_clean.append([ + v_id_clean, + (self.all_cells_temp[ + tmp_cell_index + ].tupleid + 1), + self.all_cells_temp[ + tmp_cell_index + ].columnname, k_ij]) + + self.all_cells = None + self.all_cells_temp = None + + # Create possible table + df_possible_clean = self.spark_session.createDataFrame( + possible_values_clean, self.dataset.attributes['Possible_values'] + ) + + self.dataengine.add_db_table('Possible_values_clean', + df_possible_clean, self.dataset) + self.dataengine.add_db_table_index( + self.dataset.table_specific_name('Possible_values_clean'), + 'attr_name') + + df_possible_dk = self.spark_session.createDataFrame( + possible_values_dirty, self.dataset.attributes['Possible_values']) + + self.dataengine.add_db_table('Possible_values_dk', + df_possible_dk, self.dataset) + self.dataengine.add_db_table_index( + self.dataset.table_specific_name('Possible_values_dk'), + 'attr_name') + + df_kij = self.spark_session.createDataFrame( + domain_kij_dk, self.dataset.attributes['Kij_lookup']) + self.dataengine.add_db_table('Kij_lookup_dk', + df_kij, self.dataset) + + df_kij = self.spark_session.createDataFrame( + domain_kij_clean, self.dataset.attributes['Kij_lookup']) + self.dataengine.add_db_table('Kij_lookup_clean', + df_kij, self.dataset) + + self.dataengine.holo_env.logger.info('The table: ' + + self.dataset.table_specific_name( + 'Kij_lookup_clean') + + " has been created") + self.dataengine.holo_env.logger.info(" ") + self.dataengine.holo_env.logger.info('The table: ' + + self.dataset.table_specific_name( + 'Possible_values_dk') + + " has been created") + self.dataengine.holo_env.logger.info(" ") + + create_feature_id_map = "Create TABLE " + \ + self.dataset.table_specific_name( + "Feature_id_map") + \ + "( feature_ind INT," \ + " attribute VARCHAR(255)," \ + " value VARCHAR(255)," \ + " type VARCHAR(255) );" + + self.dataengine.query(create_feature_id_map) + + query_observed = "CREATE TABLE " + \ + self.dataset.table_specific_name( + 'Observed_Possible_values_clean') + \ + " AS SELECT * FROM " + \ + self.dataset.table_specific_name( + 'Possible_values_clean') + " as t1 " + \ + " WHERE " \ + " t1.observed=1;" + + self.dataengine.query(query_observed) + + query_observed = \ + "CREATE TABLE " + \ + self.dataset.table_specific_name('Observed_Possible_values_dk') + \ + " AS SELECT * FROM " + \ + self.dataset.table_specific_name('Possible_values_dk') + \ + " as t1 WHERE " \ + "t1.observed=1;" + + self.dataengine.query(query_observed) + self.assignments = None + self.attribute_to_be_pruned = None + self.attribute_map = None + + return From b6574a5fdc64cd936a212e9ca09c27819fb8e2c3 Mon Sep 17 00:00:00 2001 From: gmichalo Date: Sun, 6 May 2018 19:26:14 -0400 Subject: [PATCH 5/8] removing unnecessary file --- reader.py | 164 ------------------------------------------------------ 1 file changed, 164 deletions(-) delete mode 100644 reader.py diff --git a/reader.py b/reader.py deleted file mode 100644 index dd21f2b3..00000000 --- a/reader.py +++ /dev/null @@ -1,164 +0,0 @@ -from holoclean.global_variables import GlobalVariables -from pyspark.sql.functions import * -from pyspark.sql.types import StructField, StructType, StringType, LongType - - -class Reader: - - """ - Reader class: - Finds the extension of the file and calls the appropriate reader - """ - - def __init__(self, spark_session): - """ - Constructing reader object - - :param spark_session: The spark_session we created in Holoclean object - """ - self.spark_session = spark_session - - # Internal Methods - def _findextesion(self, filepath): - """ - Finds the extesion of the file. - - :param filepath: The path to the file - """ - extention = filepath.split('.')[-1] - return extention - - def read(self, filepath, indexcol=0, schema=None): - """ - Calls the appropriate reader for the file - - :param schema: optional schema when known - :param filepath: The path to the file - - :return: data frame of the read data - - """ - if self._findextesion(filepath) == "csv": - csv_obj = CSVReader() - df = csv_obj.read(filepath, self.spark_session, indexcol, schema) - return df - else: - print("This extension doesn't support") - - -class CSVReader: - """ - CSVReader class: Reads a csv file and send its content back - """ - - def __init__(self): - pass - - # Setters - def read(self, file_path, spark_session, indexcol=0, schema=None): - """ - Creates a dataframe from the csv file - - :param indexcol: if 1, create a tuple id column as auto increment - :param schema: optional schema of file if known - :param spark_session: The spark_session we created in Holoclean object - :param file_path: The path to the file - - :return: dataframe - """ - if schema is None: - df = spark_session.read.csv(file_path, header=True) - else: - df = spark_session.read.csv(file_path, header=True, schema=schema) - - if indexcol == 0: - return df - - index_name = GlobalVariables.index_name - - new_cols = df.schema.names + [index_name] - list_schema = [] - for index_attribute in range(len(df.schema.names)): - list_schema.append(StructField("_" + str(index_attribute), - df.schema[ - index_attribute].dataType, - True)) - list_schema.append( - StructField("_" + str(len(new_cols)), LongType(), True)) - - schema = StructType(list_schema) - ix_df = df.rdd.zipWithIndex().map( - lambda (row, ix): row + (ix + 1,)).toDF(schema) - tmp_cols = ix_df.schema.names - new_df = reduce(lambda data, idx: data.withColumnRenamed(tmp_cols[idx], - new_cols[idx]), - xrange(len(tmp_cols)), ix_df) - new_df = self.checking_string_size(new_df) - return new_df - - def checking_string_size(self, dataframe): - """ - This method checks if the dataframe has columns with strings with more - than 250 characters - - :param dataframe: the initial dataframe - :return: dataframe: a new dataframe without the columns with strings - with more than 250 characters - """ - - columns = set([]) - for row in dataframe.collect(): - for attribute in dataframe.columns: - if isinstance(row[attribute], unicode) and\ - len(row[attribute]) > 250: - columns.add(attribute) - if len(columns) > 0: - dataframe = self.ignore_columns(columns, dataframe) - return dataframe - - def ignore_columns(self, columns, dataframe): - """ - This method asks the user if he wants to drop a column which has a - string with more than 250 characters - - :param columns: a set of columns with strings with more - than 250 characters - :param dataframe: the dataframe that we want to change - - :return: dataframe: a new dataframe - """ - print("Holoclean cannot use dataframes with strings " - "more than 250 characters") - for column in columns: - answer = raw_input( - "The column " + column + " has a string of length " - "more than 250 characters.Do you want to drop this column" - " (y/n)?") - while answer != "y" and answer != "n": - answer = raw_input( - "the column " + column + " has a string of length" - "more than 250 characters." - "Do you want to drop this column" - "(y/n)?") - if answer == "y": - dataframe = self.drop_column(column, dataframe) - else: - print \ - "Holoclean cannot use dataframes with strings " \ - "more than 250 characters. please check your dataset" - exit(5) - return dataframe - - def drop_column(self, column, dataframe): - """ - This method drop a specific column from a dataframe - - :param column: a column that will be dropped - :param dataframe: the dataframe that will be change - - :return: dataframe - """ - return dataframe.drop(column) - - - From a5ac84c231e7427de46163cec19375e47dc83d62 Mon Sep 17 00:00:00 2001 From: gmichalo Date: Sun, 6 May 2018 20:05:24 -0400 Subject: [PATCH 6/8] creating interactive_calculation_accuracy method --- holoclean/holoclean.py | 11 +- holoclean/learning/accuracy.py | 223 +++++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+), 1 deletion(-) diff --git a/holoclean/holoclean.py b/holoclean/holoclean.py index f4f7f335..2ad6bcd5 100644 --- a/holoclean/holoclean.py +++ b/holoclean/holoclean.py @@ -142,6 +142,12 @@ 'default': 1, 'type': int, 'help': 'Number of inferred values'}), + (('-inte', '--iteractive'), + {'metavar': 'Iteractive', + 'dest': 'iteractive', + 'default': 0, + 'type': int, + 'help': 'if 1 the user will evaluate performance of holoclean'}), (('-t', '--timing-file'), {'metavar': 'TIMING_FILE', 'dest': 'timing_file', @@ -514,7 +520,10 @@ def compare_to_truth(self, truth_path): """ acc = Accuracy(self, truth_path) - acc.accuracy_calculation() + if self.holo_env.iteractive: + acc.interactive_calculation_accuracy() + else: + acc.accuracy_calculation() def _ingest_dataset(self, src_path): """ diff --git a/holoclean/learning/accuracy.py b/holoclean/learning/accuracy.py index d5890297..2966b523 100644 --- a/holoclean/learning/accuracy.py +++ b/holoclean/learning/accuracy.py @@ -1,6 +1,7 @@ from holoclean.global_variables import GlobalVariables from holoclean.utils.reader import Reader from pyspark.sql.types import StructField, StructType, StringType, IntegerType +from pyspark.sql.functions import * class Accuracy: @@ -143,6 +144,228 @@ def accuracy_calculation(self): str( incorrect_init_count)) + def interactive_calculation_accuracy(self, incorrect_init=None, + incorrect_inferred=None, incorrect_map=None): + """ + + This method gives the ability to the user to self-evaluate + holoclean's performance + + """ + if self.session.inferred_values is None: + self.session.holo_env.logger.error('No inferred values') + print ("The precision and recall cannot be calculated") + + else: + checkable_inferred_query = "SELECT I.tid,I.attr_name," \ + "I.attr_val " \ + "FROM " + \ + self.dataset.table_specific_name( + 'Inferred_Values') + " AS I" + + inferred = self.dataengine.query(checkable_inferred_query, 1) + + if inferred is None: + self.session.holo_env.logger.error('No checkable inferred ' + 'values') + print ("The precision and recall cannot be calculated") + return + + checkable_original_query = "SELECT I.tid,I.attr_name," \ + "I.attr_val FROM " + \ + self.dataset.table_specific_name( + 'Observed_Possible_Values_dk') + \ + " AS I " + + init = self.dataengine.query(checkable_original_query, 1) + + if self.session.holo_env.k_inferred > 1: + checkable_map_query = "SELECT I.tid,I.attr_name," \ + "I.attr_val "\ + "FROM " + \ + self.dataset.table_specific_name( + 'Inferred_map') + " AS I " + inferred_map = self.dataengine.query(checkable_map_query, 1) + + correct_count = 0 + incorrect_count = 0 + correct_map_count = 0 + incorrect_map_count = 0 + + first = 1 + for row in init.collect(): + answer = raw_input("Is the init value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + + str(row[1]) + "Correct (y/n or q to quit)?") + + while answer != "y" and answer != "n" and answer != "q": + print("Please answer with y, n or q \n") + answer = raw_input("Is the init value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + + str(row[1]) + + "Correct (y/n or q to quit)?") + + if answer == "y": + inferred_value = inferred.filter( + col("tid")== row[0]).filter(col("attr_name") == row[1]) + if inferred_value.collect()[0].attr_val == row[2]: + correct_count = correct_count + 1 + else: + incorrect_count = incorrect_count + 1 + inferred.subtract(inferred_value) + + if self.session.holo_env.k_inferred > 1: + inferred_value = inferred_map.filter( + col("tid") == row[0]).filter( + col("attr_name") == row[1]) + correct = 0 + for row_block in inferred_value.collect(): + if row_block["attr_val"] == row[2]: + correct = 1 + if correct: + correct_map_count = correct_map_count + 1 + else: + incorrect_map_count = incorrect_map_count +1 + inferred_map.subtract(inferred_value) + + elif answer == "n": + newRow = self.session.holo_env.spark_session.createDataFrame([row]) + if first: + incorrect_init = newRow + first = 0 + else: + incorrect_init = incorrect_init.union(newRow) + else: + break + + first = 1 + for row in inferred.collect(): + answer = raw_input("Is the inferred value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + str(row[1]) + + "Correct (y/n or q to quit)?") + + while answer != "y" and answer != "n" and answer != "q": + print("Please answer with y, n or q \n") + answer = raw_input("Is the inferred value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + + str(row[1]) + + "Correct (y/n or q to quit)?") + + if answer == "y": + correct_count = correct_count + 1 + elif answer == "n": + incorrect_count = incorrect_count + 1 + newRow = self.session.holo_env.spark_session.createDataFrame([row]) + if first: + incorrect_inferred = newRow + first = 0 + else: + incorrect_inferred = incorrect_inferred.union(newRow) + else: + break + inferred_count = correct_count + incorrect_count + + if inferred_count: + precision = float(correct_count) / float(inferred_count) + + print ("The top-" + str(self.session.holo_env.k_inferred) + + " precision is : " + str(precision)) + + if incorrect_init is not None and incorrect_inferred is not None: + + incorrect_init = incorrect_init.drop("attr_val", + "g_attr_val") + incorrect_inferred = \ + incorrect_inferred.drop("attr_val", "g_attr_val") + + uncorrected_inferred = incorrect_init.intersect( + incorrect_inferred) + uncorrected_count = uncorrected_inferred.count() + incorrect_init_count = incorrect_init.count() + + if incorrect_init_count: + recall = 1.0 - (float(uncorrected_count)/float( + incorrect_init_count)) + else: + recall = 1.0 + + print ("The top-" + str(self.session.holo_env.k_inferred) + + " recall is : " + str(recall) + " out of " + str( + incorrect_init_count)) + + # Report the MAP accuracy if you are predicting more than 1 value + if self.session.holo_env.k_inferred > 1: + + first = 1 + while inferred_map.count() > 0: + row = inferred_map.collect()[0] + inferred_map_block = inferred_map.filter( + col("tid") == row[0]).filter(col("attr_name") == row[1]) + correct_flag = 0 + for row_block in inferred_map_block.collect(): + first_block = 1 + answer = raw_input("Is the inferred value " + str(row_block[2]) + + " for the cell with row id " + + str(row_block[0]) + " and attribute " + str( + row_block[1]) + + "Correct (y/n or q to quit)?") + while answer != "y" and answer != "n" and answer != "q": + print("Please answer with y, n or q \n") + answer = raw_input( + "Is the inferred value " + str(row_block[2]) + + " for the cell with row id " + + str(row_block[0]) + " and attribute " + + str(row_block[1]) + "Correct (y/n or q to quit)?") + if answer == "y": + correct_flag = 1 + break + elif answer == "n": + pass + else: + break + if answer == "q": + break + if correct_flag: + correct_map_count = correct_map_count + 1 + else: + incorrect_map_count = incorrect_map_count + 1 + if first: + incorrect_map = inferred_map_block + first = 0 + else: + incorrect_map = incorrect_map.union(inferred_map_block) + inferred_map = inferred_map.subtract(inferred_map_block) + + inferred_map_count = correct_map_count + incorrect_map_count + + if inferred_map_count: + map_precision = float(correct_map_count) / float( + inferred_map_count) + print ("The MAP precision is : " + str(map_precision)) + + if incorrect_init is not None and incorrect_map is not None: + + incorrect_init = incorrect_init.drop("attr_val", + "g_attr_val") + incorrect_map = \ + incorrect_map.drop("attr_val", "g_attr_val") + + uncorrected_map = incorrect_init.intersect( + incorrect_map) + uncorrected_map_count = uncorrected_map.count() + incorrect_init_count = incorrect_init.count() + if incorrect_init_count: + recall = 1.0 - (float(uncorrected_map_count)/float( + incorrect_init_count)) + else: + recall = 1.0 + print ("The MAP recall is : " + str(recall) + " out of " + + str(incorrect_init_count)) + def read_groundtruth(self): """ From 0ba6f48576d7707e00b7362a0a5fdca69d7b1dbb Mon Sep 17 00:00:00 2001 From: gmichalo Date: Tue, 8 May 2018 16:05:56 -0400 Subject: [PATCH 7/8] updating method interactive_calculation_accuracy --- holoclean/learning/accuracy.py | 337 ++++++++++++++++++++++----------- 1 file changed, 222 insertions(+), 115 deletions(-) diff --git a/holoclean/learning/accuracy.py b/holoclean/learning/accuracy.py index 2966b523..96831a61 100644 --- a/holoclean/learning/accuracy.py +++ b/holoclean/learning/accuracy.py @@ -4,6 +4,7 @@ from pyspark.sql.functions import * + class Accuracy: def __init__( @@ -179,7 +180,13 @@ def interactive_calculation_accuracy(self, incorrect_init=None, init = self.dataengine.query(checkable_original_query, 1) + correct_count = 0 + incorrect_count = 0 + correct_map_count = 0 + incorrect_map_count = 0 + if self.session.holo_env.k_inferred > 1: + checkable_map_query = "SELECT I.tid,I.attr_name," \ "I.attr_val "\ "FROM " + \ @@ -187,37 +194,32 @@ def interactive_calculation_accuracy(self, incorrect_init=None, 'Inferred_map') + " AS I " inferred_map = self.dataengine.query(checkable_map_query, 1) - correct_count = 0 - incorrect_count = 0 - correct_map_count = 0 - incorrect_map_count = 0 - - first = 1 - for row in init.collect(): - answer = raw_input("Is the init value " + str(row[2]) + - " for the cell with row id " - + str(row[0]) + " and attribute " + - str(row[1]) + "Correct (y/n or q to quit)?") - - while answer != "y" and answer != "n" and answer != "q": - print("Please answer with y, n or q \n") + first = 1 + for row in init.collect(): answer = raw_input("Is the init value " + str(row[2]) + " for the cell with row id " + str(row[0]) + " and attribute " + - str(row[1]) - + "Correct (y/n or q to quit)?") + str(row[ + 1]) + "Correct (y/n or q to quit)?") - if answer == "y": - inferred_value = inferred.filter( - col("tid")== row[0]).filter(col("attr_name") == row[1]) - if inferred_value.collect()[0].attr_val == row[2]: - correct_count = correct_count + 1 - else: - incorrect_count = incorrect_count + 1 - inferred.subtract(inferred_value) + while answer != "y" and answer != "n" and answer != "q": + print("Please answer with y, n or q \n") + answer = raw_input("Is the init value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + + str(row[1]) + + "Correct (y/n or q to quit)?") - if self.session.holo_env.k_inferred > 1: - inferred_value = inferred_map.filter( + if answer == "y": + inferred_map_value = inferred_map.filter( + col("tid") == row[0]).filter( + col("attr_name") == row[1]) + if inferred_map_value.collect()[0].attr_val == row[2]: + correct_map_count = correct_map_count + 1 + else: + incorrect_map_count = incorrect_map_count + 1 + inferred_map.subtract(inferred_map_value) + inferred_value = inferred.filter( col("tid") == row[0]).filter( col("attr_name") == row[1]) correct = 0 @@ -225,121 +227,135 @@ def interactive_calculation_accuracy(self, incorrect_init=None, if row_block["attr_val"] == row[2]: correct = 1 if correct: - correct_map_count = correct_map_count + 1 + correct_count = correct_count + 1 else: - incorrect_map_count = incorrect_map_count +1 - inferred_map.subtract(inferred_value) - - elif answer == "n": - newRow = self.session.holo_env.spark_session.createDataFrame([row]) - if first: - incorrect_init = newRow - first = 0 - else: - incorrect_init = incorrect_init.union(newRow) - else: - break - - first = 1 - for row in inferred.collect(): - answer = raw_input("Is the inferred value " + str(row[2]) + - " for the cell with row id " - + str(row[0]) + " and attribute " + str(row[1]) + - "Correct (y/n or q to quit)?") - - while answer != "y" and answer != "n" and answer != "q": - print("Please answer with y, n or q \n") - answer = raw_input("Is the inferred value " + str(row[2]) + - " for the cell with row id " - + str(row[0]) + " and attribute " + - str(row[1]) + - "Correct (y/n or q to quit)?") - - if answer == "y": - correct_count = correct_count + 1 - elif answer == "n": - incorrect_count = incorrect_count + 1 - newRow = self.session.holo_env.spark_session.createDataFrame([row]) - if first: - incorrect_inferred = newRow - first = 0 - else: - incorrect_inferred = incorrect_inferred.union(newRow) - else: - break - inferred_count = correct_count + incorrect_count - - if inferred_count: - precision = float(correct_count) / float(inferred_count) - - print ("The top-" + str(self.session.holo_env.k_inferred) + - " precision is : " + str(precision)) - - if incorrect_init is not None and incorrect_inferred is not None: + incorrect_count = incorrect_count + 1 + inferred.subtract(inferred_value) - incorrect_init = incorrect_init.drop("attr_val", - "g_attr_val") - incorrect_inferred = \ - incorrect_inferred.drop("attr_val", "g_attr_val") - - uncorrected_inferred = incorrect_init.intersect( - incorrect_inferred) - uncorrected_count = uncorrected_inferred.count() - incorrect_init_count = incorrect_init.count() - - if incorrect_init_count: - recall = 1.0 - (float(uncorrected_count)/float( - incorrect_init_count)) + elif answer == "n": + newRow = self.session.holo_env.spark_session.createDataFrame( + [row]) + if first: + incorrect_init = newRow + first = 0 + else: + incorrect_init = incorrect_init.union(newRow) else: - recall = 1.0 - - print ("The top-" + str(self.session.holo_env.k_inferred) + - " recall is : " + str(recall) + " out of " + str( - incorrect_init_count)) - - # Report the MAP accuracy if you are predicting more than 1 value - if self.session.holo_env.k_inferred > 1: + break first = 1 - while inferred_map.count() > 0: - row = inferred_map.collect()[0] - inferred_map_block = inferred_map.filter( - col("tid") == row[0]).filter(col("attr_name") == row[1]) + while inferred.count() > 0: + row = inferred.collect()[0] + inferred_block = inferred.filter( + col("tid") == row[0]).filter( + col("attr_name") == row[1]) correct_flag = 0 - for row_block in inferred_map_block.collect(): - first_block = 1 - answer = raw_input("Is the inferred value " + str(row_block[2]) + - " for the cell with row id " - + str(row_block[0]) + " and attribute " + str( - row_block[1]) + - "Correct (y/n or q to quit)?") + for row_block in inferred_block.collect(): + answer = raw_input( + "Is the inferred value " + str(row_block[2]) + + " for the cell with row id " + + str(row_block[0]) + " and attribute " + str( + row_block[1]) + + "Correct (y/n or q to quit)?") while answer != "y" and answer != "n" and answer != "q": print("Please answer with y, n or q \n") answer = raw_input( - "Is the inferred value " + str(row_block[2]) + + "Is the inferred value " + str( + row_block[2]) + " for the cell with row id " + str(row_block[0]) + " and attribute " + - str(row_block[1]) + "Correct (y/n or q to quit)?") + str(row_block[ + 1]) + "Correct (y/n or q to quit)?") if answer == "y": correct_flag = 1 break elif answer == "n": pass else: + #quit break if answer == "q": break if correct_flag: - correct_map_count = correct_map_count + 1 + correct_count = correct_count + 1 else: - incorrect_map_count = incorrect_map_count + 1 + incorrect_count = incorrect_count + 1 + newRow = self.session.holo_env.spark_session.createDataFrame( + [row_block]) if first: - incorrect_map = inferred_map_block + incorrect_inferred = newRow first = 0 else: - incorrect_map = incorrect_map.union(inferred_map_block) - inferred_map = inferred_map.subtract(inferred_map_block) + incorrect_inferred = incorrect_inferred.union( + newRow) + inferred = inferred.subtract(inferred_block) + + inferred_count = correct_count + incorrect_count + + if inferred_count: + precision = float(correct_count) / float(inferred_count) + + print ("The top-" + str( + self.session.holo_env.k_inferred) + + " precision is : " + str(precision)) + if incorrect_init is not None and incorrect_inferred is not None: + + incorrect_init = incorrect_init.drop("attr_val", + "g_attr_val") + incorrect_inferred = \ + incorrect_inferred.drop("attr_val", + "g_attr_val") + + uncorrected_inferred = incorrect_init.intersect( + incorrect_inferred) + uncorrected_count = uncorrected_inferred.count() + incorrect_init_count = incorrect_init.count() + + if incorrect_init_count: + recall = 1.0 - ( + float(uncorrected_count) / float( + incorrect_init_count)) + else: + recall = 1.0 + + print ("The top-" + str( + self.session.holo_env.k_inferred) + + " recall is : " + str( + recall) + " out of " + str( + incorrect_init_count)) + first = 1 + + # Report the MAP accuracy + for row in inferred_map.collect(): + answer = raw_input("Is the inferred value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + str( + row[1]) + "Correct (y/n or q to quit)?") + + while answer != "y" and answer != "n" and answer != "q": + print("Please answer with y, n or q \n") + answer = raw_input( + "Is the inferred value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + + str(row[1]) + + "Correct (y/n or q to quit)?") + + if answer == "y": + correct_map_count = correct_map_count + 1 + elif answer == "n": + incorrect_map_count = incorrect_map_count + 1 + newRow = self.session.holo_env.spark_session.createDataFrame( + [row]) + if first: + incorrect_map = newRow + first = 0 + else: + incorrect_map = incorrect_map.union( + newRow) + else: + break inferred_map_count = correct_map_count + incorrect_map_count if inferred_map_count: @@ -365,6 +381,97 @@ def interactive_calculation_accuracy(self, incorrect_init=None, recall = 1.0 print ("The MAP recall is : " + str(recall) + " out of " + str(incorrect_init_count)) + else: + first = 1 + for row in init.collect(): + answer = raw_input("Is the init value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + + str(row[1]) + "Correct (y/n or q to quit)?") + + while answer != "y" and answer != "n" and answer != "q": + print("Please answer with y, n or q \n") + answer = raw_input("Is the init value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + + str(row[1]) + + "Correct (y/n or q to quit)?") + + if answer == "y": + inferred_value = inferred.filter( + col("tid")== row[0]).filter(col("attr_name") == row[1]) + if inferred_value.collect()[0].attr_val == row[2]: + correct_count = correct_count + 1 + else: + incorrect_count = incorrect_count + 1 + inferred.subtract(inferred_value) + + elif answer == "n": + newRow = self.session.holo_env.spark_session.createDataFrame([row]) + if first: + incorrect_init = newRow + first = 0 + else: + incorrect_init = incorrect_init.union(newRow) + else: + #quit + break + + first = 1 + for row in inferred.collect(): + answer = raw_input("Is the inferred value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + str(row[1]) + + "Correct (y/n or q to quit)?") + + while answer != "y" and answer != "n" and answer != "q": + print("Please answer with y, n or q \n") + answer = raw_input("Is the inferred value " + str(row[2]) + + " for the cell with row id " + + str(row[0]) + " and attribute " + + str(row[1]) + + "Correct (y/n or q to quit)?") + + if answer == "y": + correct_count = correct_count + 1 + elif answer == "n": + incorrect_count = incorrect_count + 1 + newRow = self.session.holo_env.spark_session.createDataFrame([row]) + if first: + incorrect_inferred = newRow + first = 0 + else: + incorrect_inferred = incorrect_inferred.union(newRow) + else: + # quit + break + inferred_count = correct_count + incorrect_count + if inferred_count: + precision = float(correct_count) / float(inferred_count) + + print ("The top-" + str(self.session.holo_env.k_inferred) + + " precision is : " + str(precision)) + + if incorrect_init is not None and incorrect_inferred is not None: + incorrect_init = incorrect_init.drop("attr_val", + "g_attr_val") + incorrect_inferred = \ + incorrect_inferred.drop("attr_val", "g_attr_val") + + uncorrected_inferred = incorrect_init.intersect( + incorrect_inferred) + uncorrected_count = uncorrected_inferred.count() + incorrect_init_count = incorrect_init.count() + + if incorrect_init_count: + recall = 1.0 - (float(uncorrected_count)/float( + incorrect_init_count)) + else: + recall = 1.0 + + print ("The top-" + str(self.session.holo_env.k_inferred) + + " recall is : " + str(recall) + " out of " + str( + incorrect_init_count)) def read_groundtruth(self): From 1726704666bb09dfabd9a363b0b96726ec78b87d Mon Sep 17 00:00:00 2001 From: gmichalo Date: Sun, 13 May 2018 13:18:26 -0400 Subject: [PATCH 8/8] fixing typos on holoclean.py --- holoclean/holoclean.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/holoclean/holoclean.py b/holoclean/holoclean.py index 2ad6bcd5..588f4350 100644 --- a/holoclean/holoclean.py +++ b/holoclean/holoclean.py @@ -142,9 +142,9 @@ 'default': 1, 'type': int, 'help': 'Number of inferred values'}), - (('-inte', '--iteractive'), - {'metavar': 'Iteractive', - 'dest': 'iteractive', + (('-inte', '--interactive'), + {'metavar': 'Interactive', + 'dest': 'interactive', 'default': 0, 'type': int, 'help': 'if 1 the user will evaluate performance of holoclean'}), @@ -512,7 +512,7 @@ def repair(self): return self._create_corrected_dataset() - def compare_to_truth(self, truth_path): + def compare_to_truth(self, truth_path=None): """ Compares our repaired set to the truth prints precision and recall @@ -520,7 +520,7 @@ def compare_to_truth(self, truth_path): """ acc = Accuracy(self, truth_path) - if self.holo_env.iteractive: + if self.holo_env.interactive: acc.interactive_calculation_accuracy() else: acc.accuracy_calculation()