@@ -21,7 +21,8 @@ class DataTransformationConfig:
21
21
holidays :str = os .path .join ("artifacts" , "holidays.csv" )
22
22
processed_data :str = os .path .join ("artifacts" , "processed_data.csv" )
23
23
train_data :str = os .path .join ("artifacts" , "train_data.csv" )
24
- test_data :str = os .path .join ("artifacts" , "test_data.csv" )
24
+ test_data :str = os .path .join ("artifacts" , "test_data.joblib" )
25
+ test_data_covariates :str = os .path .join ("artifacts" , "test_data_covariates.joblib" )
25
26
timeseries_data :str = os .path .join ("artifacts" , "timeseries_data.joblib" )
26
27
covariates :str = os .path .join ("artifacts" , "covariates.joblib" )
27
28
@@ -85,7 +86,7 @@ def integrate_data(self):
85
86
indices = processed_data [((processed_data ["date" ] == date ) & (processed_data ["city" ] == city ))].index
86
87
processed_data ["is_holiday" ][indices ] = 1
87
88
88
- processed_data .to_csv (self .datatransformationconfig .processed_data )
89
+ processed_data .to_csv (self .datatransformationconfig .processed_data , index = False )
89
90
90
91
logging .info ("data integration complete" )
91
92
@@ -112,8 +113,8 @@ def split_data(self, number_of_test_days = 15):
112
113
train_data = processed_data .iloc [:split_index + 1 , :]
113
114
test_data = processed_data .iloc [split_index + 1 :, :]
114
115
115
- train_data .to_csv (self .datatransformationconfig .train_data )
116
- test_data . to_csv ( self .datatransformationconfig .test_data )
116
+ train_data .to_csv (self .datatransformationconfig .train_data , index = False )
117
+ joblib . dump ( test_data , self .datatransformationconfig .test_data )
117
118
118
119
logging .info ("data split complete" )
119
120
@@ -131,7 +132,7 @@ def transform_data(self):
131
132
logging .info ("executing transform_data function" )
132
133
try :
133
134
train_data = pd .read_csv (self .datatransformationconfig .train_data )
134
- test_data = pd . read_csv (self .datatransformationconfig .test_data )
135
+ test_data = joblib . load (self .datatransformationconfig .test_data )
135
136
136
137
train_data .drop (["id" , "city" , "store_type" , "state" , "cluster" ], axis = 1 , inplace = True )
137
138
test_data .drop (["id" , "city" , "store_type" , "state" , "cluster" ], axis = 1 , inplace = True )
@@ -160,6 +161,19 @@ def transform_data(self):
160
161
covariates [cov ].loc [date , :] = [np .NaN ] * covariates [cov ].shape [1 ]
161
162
covariates [cov ] = covariates [cov ].ffill ()
162
163
164
+ logging .info ("reformatting test_data" )
165
+
166
+ test_sales = {}
167
+ test_covariates = {}
168
+ for group , data_slice in test_data .groupby (by = ["store_nbr" , "family" ]):
169
+ data_slice .set_index ("date" , drop = True , inplace = True )
170
+ test_covariate = data_slice [["onpromotion" , "dcoilwtico" , "is_holiday" ]]
171
+ test_sales_series = data_slice ["sales" ]
172
+ test_sales [group ] = test_sales_series
173
+ test_covariates [str (group )] = test_covariate
174
+
175
+ test_data = pd .DataFrame (data = test_sales )
176
+
163
177
logging .info ("detecting and removing outliers from different series" )
164
178
165
179
temp = series_dataset .apply (lambda x : hampel (x , window_size = 7 , n_sigma = 3.0 ).filtered_data )
@@ -178,24 +192,34 @@ def transform_data(self):
178
192
constant_features .append (feature )
179
193
features_to_keep = set (series_dataset .columns ).difference (set (constant_features ))
180
194
series_dataset = series_dataset [features_to_keep ]
181
- for constant_feature in constant_features :
182
- test_data [~ ((test_data ["store_nbr" ] == constant_feature [0 ]) & (test_data ["family" ] == constant_feature [1 ]))]
195
+
183
196
series_dataset = series_dataset [sorted (series_dataset .columns )]
197
+ test_data = test_data [series_dataset .columns ]
184
198
185
- logging .info ("converting sales series and covariates into Darta TimeSeries" )
199
+ logging .info ("converting sales series and covariates into Darts TimeSeries" )
186
200
187
201
series_dataset .set_index (pd .to_datetime (series_dataset .index ), inplace = True )
202
+ test_data .set_index (pd .to_datetime (test_data .index ), inplace = True )
203
+
188
204
timeseries_data = TimeSeries .from_dataframe (series_dataset )
205
+ test_data = TimeSeries .from_dataframe (test_data )
189
206
190
207
for cov_key in covariates :
191
208
temp_cov = covariates [cov_key ]
192
209
temp_cov .set_index (pd .to_datetime (temp_cov .index ), inplace = True )
193
210
covariates [cov_key ] = TimeSeries .from_dataframe (temp_cov )
194
211
212
+ for cov_key in test_covariates :
213
+ temp_cov = test_covariates [cov_key ]
214
+ temp_cov .set_index (pd .to_datetime (temp_cov .index ), inplace = True )
215
+ test_covariates [cov_key ] = TimeSeries .from_dataframe (temp_cov )
216
+
195
217
joblib .dump (timeseries_data , self .datatransformationconfig .timeseries_data )
196
218
joblib .dump (covariates , self .datatransformationconfig .covariates )
219
+ joblib .dump (test_data , self .datatransformationconfig .test_data )
220
+ joblib .dump (test_covariates , self .datatransformationconfig .test_data_covariates )
197
221
198
- logging .info ("saved timeseries_data and covariates to artifacts" )
222
+ logging .info ("saved timeseries_data, test_data and covariates to artifacts" )
199
223
logging .info (">>> DATA TRANSFORMATION COMPLETE <<<" )
200
224
201
225
except Exception as e :
0 commit comments