diff --git a/.DS_Store b/.DS_Store index 5847762a..e619f826 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.gitignore b/.gitignore index 07a00e9b..3ab3a2c3 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,6 @@ vnstock3/.DS_Store docs/.DS_Store dev/check_MSN_quarter_reports_df.xlsx dev/MSN_quarter_reports_df.xlsx +dev/vci_financial_test_notebook_Nov_1.ipynb +vnstock3/.DS_Store +.DS_Store diff --git a/vnstock3/explorer/vci/financial.py b/vnstock3/explorer/vci/financial.py index 5c28c48a..b3f4b562 100644 --- a/vnstock3/explorer/vci/financial.py +++ b/vnstock3/explorer/vci/financial.py @@ -185,10 +185,12 @@ def _ratio_mapping (self, ratio_df:pd.DataFrame, lang:Optional[str]='en', show_l if lang == 'vi': ratio_df = ratio_df.rename(columns={'ticker': 'CP', 'yearReport': 'Năm', 'lengthReport': 'Kỳ'}) - index_part = ratio_df[['CP', 'Năm', 'Kỳ']] + index_cols = ['CP', 'Năm', 'Kỳ'] + index_part = ratio_df[index_cols] target_col_name = 'name' elif lang == 'en': - index_part = ratio_df[['ticker', 'yearReport', 'lengthReport']] + index_cols = ['ticker', 'yearReport', 'lengthReport'] + index_part = ratio_df[index_cols] target_col_name = 'en_name' # Create a dictionary to map field_name to report type @@ -200,16 +202,10 @@ def _ratio_mapping (self, ratio_df:pd.DataFrame, lang:Optional[str]='en', show_l mapping_specific = pd.DataFrame() mapping_ct = mapping_df[mapping_df['com_type_code'] == 'CT'] all_columns_mapping = pd.concat([mapping_specific, mapping_ct]).drop_duplicates(subset='field_name', keep='first') - all_columns_mapping = self.duplicated_columns_handling(all_columns_mapping, target_col_name=target_col_name) # remove all values that com_type_code is not 'CT' or self.com_type_code all_columns_mapping = all_columns_mapping[all_columns_mapping['com_type_code'].isin(['CT', self.com_type_code])].copy() - # # Filter the mapping DataFrame based on company type code. Split mapping into two parts: 'CT' and company-specific mapping - # mapping_specific = mapping_df[mapping_df['com_type_code'] == self.com_type_code] - # mapping_ct = mapping_df[mapping_df['com_type_code'] != self.com_type_code] - # all_columns_mapping = pd.concat([mapping_specific, mapping_ct]).drop_duplicates(subset='field_name', keep='first') - original_columns = ratio_df.columns # There are many columns in ratio_df that are not in mapping_df, they are not tied to the company specific mapping or the 'CT' mapping orphan_columns = [col for col in original_columns if col not in mapping_df['field_name'].values and col not in index_part.columns] @@ -219,46 +215,18 @@ def _ratio_mapping (self, ratio_df:pd.DataFrame, lang:Optional[str]='en', show_l columns_translation = all_columns_mapping.set_index('field_name')[target_col_name].to_dict() # Create a dictionary to map field_name to order column_order = all_columns_mapping.set_index(target_col_name)['order'].to_dict() + + # Drop the orphan columns which are not in the mapping DataFrame + ratio_df = ratio_df.drop(columns=orphan_columns) + # apply sorting to the columns of ratio_df by the order in the column_order dictionary + ratio_df = ratio_df[sorted(ratio_df.columns, key=lambda x: column_order.get(x, 0))] - translated_ratio_df = ratio_df.copy() - # look up the value in translated_ratio_df and replace it with the value in column_translation - translated_ratio_df = translated_ratio_df.rename(columns=columns_translation) - # Remove orphan columns - translated_ratio_df = translated_ratio_df.drop(columns=orphan_columns) - - # # Get the list of duplicated columns names in translated_ratio_df.columns - # duplicated_columns = translated_ratio_df.columns[translated_ratio_df.columns.duplicated()].tolist() - - type_mapping = dict(zip(mapping_df[target_col_name], mapping_df['type'])) - - # Organize columns in ratio_df into different reports based on type - reports = {} - for field, report_type in type_mapping.items(): - if report_type not in reports: - reports[report_type] = [field] - else: - reports[report_type].append(field) - - # Rename columns in ratio_df based on mapping_df's 'name' - name_mapping = dict(zip(mapping_df['field_name'], mapping_df[target_col_name])) - if show_log: - logger.debug(f"Name mapping: {name_mapping}") - ratio_df.rename(columns=name_mapping, inplace=True) - # Reorder columns based on 'order' in mapping_df - ratio_df = ratio_df[sorted(ratio_df.columns, key=lambda x: int(column_order.get(x, 0)))] - - ## Handle duplicate columns - # ratio_df = self.handle_duplicate_columns(original_columns, ratio_df) - - # Create DataFrames for each report type with exception handling + report_type_field_dict = all_columns_mapping.groupby('type')['field_name'].apply(list).to_dict() + + # Create DataFrames for each report type, using columns name as code names without translation report_dfs = {} - for report_type, columns in reports.items(): - try: - # Filter the DataFrame only for columns that exist in ratio_df - filtered_columns = [col for col in columns if col in ratio_df.columns] - report_dfs[report_type] = ratio_df[filtered_columns] - except KeyError as e: - print(f"Failed to create DataFrame for {report_type} due to missing columns: {e}") + for report_type, fields in report_type_field_dict.items(): + report_dfs[report_type] = ratio_df[index_cols + fields] # Define the primary report types primary_reports = [ @@ -271,6 +239,17 @@ def _ratio_mapping (self, ratio_df:pd.DataFrame, lang:Optional[str]='en', show_l primary_dfs = {key: report_dfs[key] for key in primary_reports} other_reports = {key: report_dfs[key] for key in report_dfs if key not in primary_reports} + + # Create a dictionary to map field_name to report type + name_mapping = dict(zip(mapping_df['field_name'], mapping_df[target_col_name])) + + # Translate the column names + for key in primary_dfs: + primary_dfs[key].columns = [name_mapping.get(col, col) for col in primary_dfs[key].columns] + + for key in other_reports: + other_reports[key].columns = [name_mapping.get(col, col) for col in other_reports[key].columns] + # Merge all other reports into a single DataFrame with MultiIndex columns merged_other_reports = pd.concat(other_reports.values(), axis=1, keys=other_reports.keys()) @@ -290,7 +269,7 @@ def multi_level_columns_translate(col, translation_dict): if lang == 'en': # Apply the columns_translate function to all columns merged_other_reports.columns = pd.MultiIndex.from_tuples( - [multi_level_columns_translate(col, columns_translate) for col in merged_other_reports.columns] + [multi_level_columns_translate(col, columns_translation) for col in merged_other_reports.columns] ) return primary_dfs, merged_other_reports