-
Notifications
You must be signed in to change notification settings - Fork 0
/
train_single_feature.py
234 lines (188 loc) · 8.15 KB
/
train_single_feature.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import os
import numpy as np
import matplotlib.pyplot as plt
from datetime import *
from scipy.special import expit
from tqdm import tqdm
from data_extractor import dataloader
from single_feature_model import Model
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
def calculate_reward(inventory, closing_price, sold_profit=0):
'''
An improved reward function (based on feedback from the interim report)
that takes into account the value of the portfolio itself, not just the profit gained from selling the stocks.
'''
# Get the number of stocks in portfolio
num_stocks = len(inventory)
# Calculate the total value of the portfolio based on today's closing price
total_portfolio_value = num_stocks * closing_price
# Subtract the initial investment from the total portfolio value to get the total profit
initial_investment = sum(inventory)
total_profit = total_portfolio_value - initial_investment
# Add the portfolio profit to the profit gained from selling the stocks (if any)
total_profit += sold_profit
# Clip the reward to be between -1 and 1
total_profit = np.clip(total_profit, -1, 1)
return total_profit # This is your reward
def format_price(n):
'''
Formats a number into a string with 2 decimal places.
'''
# Convert n from numpy array to float
n = float(n)
if n < 0:
return "-${0:2f}".format(abs(n))
else:
return "${0:2f}".format(abs(n))
def state_creator(data, timestep, window_size):
'''
Changes input data to be differences in stock prices,
which represent price changes over time.
This will allow model to predict buy/sell/hold rather than the price itself.
'''
starting_id = timestep - window_size + 1
if starting_id >= 0:
windowed_data = data[starting_id:timestep + 1]
else:
windowed_data = -starting_id * [data[0]] + list(data[0: timestep + 1])
state = []
for i in range(window_size - 1):
# expit is logistic sigmoid function, and avoids overflow errors associated w/ large diffs in stock price
# https://i.stack.imgur.com/WY61Z.png
state.append(expit(windowed_data[i + 1] - windowed_data[i]))
return np.array([state])
def train_model(data, model, window_size, episodes, batch_size=32):
for episode in range(1, episodes + 1): # For printing purposes
print("Episode: {}/{}".format(episode, episodes))
state = state_creator(data, 0, window_size + 1)
total_profit = 0
model.inventory = []
for t in tqdm(range(len(data))):
# print("Timestep: {}/{}".format(t, len(data)))
action = model.trade(state)
if t == len(data) - 1:
# When the episode is done, we don't have a next state, so we set it to the last state
next_state = state
else:
next_state = state_creator(data, t + 1, window_size + 1)
reward = 0
# Buy stock
if action == 1:
model.inventory.append(data[t])
reward = calculate_reward(model.inventory, data[t])
# print("Buy: {}".format(format_price(data[t])))
# Sell stock
elif action == 2 and len(model.inventory) > 0:
bought_price = model.inventory.pop(0)
# reward = max(data[t] - bought_price, 0)
profit = data[t] - bought_price
reward = calculate_reward(model.inventory, data[t], profit)
total_profit += data[t] - bought_price
# print("Sell: {} | Profit: {}".format(format_price(data[t]), format_price(data[t] - bought_price)))
# Hold stock
elif action == 0:
reward = calculate_reward(model.inventory, data[t])
# print("Hold: {}".format(format_price(data[t])))
pass
# If the episode is done, we fit the model to the target
done = True if t == len(data) - 1 else False
model.memory.append((state, action, reward, next_state, done))
state = next_state
if done:
print("--------------------------------")
print("Total Profit: {}".format(format_price(total_profit)))
print("--------------------------------")
# Once we have enough data in memory, we can start training the model in batches
if len(model.memory) > batch_size:
model.batch_train(batch_size)
print("Total Profit: {}".format(format_price(total_profit)))
print("Saving model...")
model.model.save(f"models/{stock.lower()}.h5")
def test_model(data, model, window_size, stock, start_date, end_date):
'''
Test the trained model by having it trade for a set test period.
For this, we don't use the memory, and we don't need the reward.
'''
state = state_creator(data, 0, window_size + 1)
total_profit = 0
model.inventory = []
profits = []
for t in range(len(data)):
print("Timestep: {}/{}".format(t, len(data)))
action = model.trade(state, is_eval=True)
print("Action: {}".format(action))
if t == len(data) - 1:
# When the episode is done, we don't have a next state, so we set it to the last state
next_state = state
else:
next_state = state_creator(data, t + 1, window_size + 1)
# Buy stock
if action == 1:
model.inventory.append(data[t])
print("Buy: {}".format(format_price(data[t])))
# Sell stock
elif action == 2 and len(model.inventory) > 0:
bought_price = model.inventory.pop(0)
total_profit += data[t] - bought_price
print("Sell: {} | Profit: {}".format(format_price(data[t]), format_price(data[t] - bought_price)))
# Hold stock
elif action == 0:
print("Hold: {}".format(format_price(data[t])))
pass
state = next_state
# Save the profit for each timestep
profits.append(total_profit)
print(profits)
print("Overall Profit Over Testing Period: {}".format(format_price(total_profit)))
# Use matplotlib to plot the profit over time
plt.plot(profits)
plt.xlabel('Time (Days)')
plt.ylabel('Profit (USD)')
plt.title(f'Profit Over Time for {stock} From {start_date} to {end_date}')
plt.legend([f'{stock}'])
if not os.path.exists('plots'):
os.makedirs('plots')
plt.savefig(f'plots/{stock.lower()}.png')
plt.show()
if __name__ == "__main__":
window_size = 10
episodes = 5
stock = 'AMZN'
data = dataloader(stock, 'data/', '2021-01-01', '2023-01-01')
# We only want the closing price
data = data['Close'].values
batch_size = 32
trader = Model(window_size)
trader.model = trader.model_builder()
# trader.model.summary()
train_model(data, trader, window_size, episodes, batch_size)
### Testing the model ###
# Load the model
trader = Model(window_size)
trader.model = trader.model_builder()
trader.model.load_weights(f"models/{stock.lower()}.h5")
# Set start and end date for testing period
test_start = '2023-01-02'
test_end = '2023-03-02'
test_data = dataloader(stock, 'data/', test_start, test_end)
test_data = test_data['Close'].values
# Test the model
test_model(test_data, trader, window_size, stock, test_start, test_end)
quit()
### Testing the model to predict the signal for tomorrow ###
# Get the current date
today = datetime.today().strftime('%Y-%m-%d')
# Get the date from a week ago
week_ago = (datetime.strptime(today, '%Y-%m-%d') - timedelta(days=7)).strftime('%Y-%m-%d')
# Get the stock closing price for the last week
test_data = dataloader('AAPL', 'data/', week_ago, today)
test_data = test_data['Close'].values
# Use the model to predict the stock price for tomorrow
state = state_creator(test_data, 0, window_size + 1)
action = trader.trade(state)
actions = {
0: "Hold",
1: "Buy",
2: "Sell"
}
print("Action for {} on {}: {}".format(stock, today, actions[action]))