-
Notifications
You must be signed in to change notification settings - Fork 0
/
spreadsheet_io.py
171 lines (135 loc) · 6.11 KB
/
spreadsheet_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""This module defines functions used for importing and exporting data,
as well as formatting at either end of the process.
"""
from settings import *
import pandas as pd
import numpy as np
def process_raw_spreadsheet(df: pd.DataFrame) -> tuple:
"""This function takes takes a DataFrame read from an Excel file,
separates the header (metadata) from the body (data), and returns
a tuple of (data, metadata).
Arguments:
df -- a DataFrame containing raw spreadsheet data read from an Excel file.
The formatting of the Excel file should follow these rules:
- The header contains key, value pairs, where the key is the
name of the field (e.g. "Tenants", "Address", "Phone Number", etc.)
and the value is the value (e.g. "John Doe", "1600 Pennsylvania Avenue",
"555-0143").
- Each key should be the first cell in a row, with its value in the
in the next cells. Keys with multiple values should have each value
in its own cell.
- At the end of the header, there is a row containing the equation
'=CHAR(02)' in its first column. This row marks that the data
begins in the next row.
- Every column containing data must be named; any unnamed columns
will be expunged.
Outputs:
data -- a DataFrame containing th data, with missing values
replaced with 0
metadata -- a dict containing the metadata
"""
stx_index = -1
for sequence in settings["name-map"]["_stx_"]["alt"]:
if any(df[0]==sequence):
stx_index = df.index[df[0]==sequence][0]
break
#The end of the header/start of data can be indicated by
#any string sequence defined in the settings.
#In the case that there is no end-of-header sequence present in the DataFrame,
#assume that there is no metadata.
if stx_index == -1:
file_header = df[:0]
else:
file_header = df[:stx_index]
metadata = {}
for row_series in file_header.iterrows():
#If there is no header, the loop will be skipped, and metadata will
#be returned as an empty dict.
row = list(row_series[1].dropna())
#Get the list of values ([0] contains the index),
#drop NaN values, and convert to an array to prevent weirdness.
if len(row) == 0:
continue
#Skip empty rows.
key = disambiguate(row[0])
if len(row) == 1:
#Missing value.
metadata[key] = None
elif len(row) == 2:
#Single-valued tag.
metadata[key] = row[1]
else:
#Multi-valued tag.
metadata[key] = row[1:]
column_headers = df.loc[stx_index+1]
#The row after the STX/End of Header row contains the column names
#of the data.
spreadsheet_matrix = np.zeros(len(column_headers))
#Contingency for empty spreadsheets. Create a row of zeros if the spreadsheet is
#empty to prevent the subsequent code from breaking and still allow the
#spreadsheet to be parsed.
try:
spreadsheet_matrix = df.loc[(stx_index+2):]
except ValueError:
print("Empty spreadsheet.")
data = pd.DataFrame(
np.array(spreadsheet_matrix),
columns = np.array(column_headers)
)
#Create a DataFrame containing the actual spreadsheet data.
#If any of the metadata tags contain lists longer than the number
#of columns in the spreadsheet data, then all the data rows will fill
#with NaN to match the length.
data.columns = data.columns.fillna('NaN')
if "NaN" in data.columns:
data = data.drop("NaN",axis=1)
#Drop every column whose header is NaN.
#The conditional prevents an error from occuring when
#there are no NaN-valued columns (i.e. metadata lists
#do not exceed data columns in length).
for column in data.columns:
if "datetime" in str(data[column].dtype):
data[column] = pd.to_datetime(data[column]).dt.date
#Remove time component from datetime columns
data[column] = pd.DatetimeIndex(data[column]).strftime(settings["date-format"])
data[column] = data[column].fillna(method='ffill')
#Forward-fill missing dates (i.e. assume rows with unspecified dates
#are from the last specified date.)
data = data.fillna(0)
#Fill any remaining NaN entry with 0 - because this is legal stuff,
#we want to keep every row, regardless of if it has a missing value.
#Replacing NaN with 0 will cause minimal errors when computing balances.
data.rename(columns={col:disambiguate(col) for col in data.columns},inplace=True)
#Disambiguate column headers.
return (data, metadata)
def load_spreadsheets() -> dict:
"""This function loads data from all specified sources, processes it,
and stores it in the appropriate data structures."""
loaded_spreadsheets = {}
for path in settings["spreadsheet_paths"]:
try:
raw_data = pd.read_excel(path,header=None)
except FileNotFoundError:
print(f"No file found at {path}")
continue
except:
print(f"Invalid file: '{path}'")
continue
#If the file is problematic, just skip it.
try:
data, metadata = process_raw_spreadsheet(raw_data)
#Try to process the spreadsheet.
metadata[map_name("_filepath_")] = path
metadata[map_name("_filepath_")] = path.split("/")[-1].replace(".xlsx","")
if not map_name("_name_") in metadata:
metadata[map_name("_name_")] = metadata[map_name("_filename_")]
#If the spreadsheet doesn't have a name tag, then set it to
#the file name.
if not map_name("_type_") in metadata:
metadata[map_name("_type_")] = "_blank_"
meta_name = metadata[map_name("_name_")]
loaded_spreadsheets[meta_name] = (data, metadata)
except:
print(f"Error converting file: '{path}'")
continue
return loaded_spreadsheets