-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
213 lines (172 loc) · 6.99 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import logging
import os
import sys
import time
from http import HTTPStatus
from typing import Dict, List, Union
import pandas as pd
import pandas_ta as ta
from exceptions import (APIResponseError, APIStatusCodeError, DataError,
ExchangeError, IndicatorDataError, TelegramError)
from exchange import Binance, DataTransferObject, Exchange
from settings import (BASE_DIR, BINANCE_API_KEY, BINANCE_PRIVATE_KEY, BINANCE_MARKET_TYPE,
DATA_LIMIT, DATA_MARKET_ID, DATA_RETRY_TIME, DATA_TIMEFRAME, ORDER_SIZE,
RSI_PERIOD, RSI_LOWER, RSI_UPPER, TELEGRAM_CHAT_ID, TELEGRAM_ENDPOINT,
TELEGRAM_TOKEN, TELEGRAM_MESSAGE)
from telegram import Telegram
telegram_config = {
'chat_id': TELEGRAM_CHAT_ID,
'endpoint': TELEGRAM_ENDPOINT,
'token': TELEGRAM_TOKEN,
}
telegram = Telegram(telegram_config)
exchange_config = {
'api_key': BINANCE_API_KEY,
'private_key': BINANCE_PRIVATE_KEY,
'market_type': BINANCE_MARKET_TYPE,
'market_id': DATA_MARKET_ID,
'timeframe': DATA_TIMEFRAME,
'limit': DATA_LIMIT,
}
exchange = Binance(exchange_config)
def check_position_response(
response: List[Dict[str, Union[int, float, str]]]
) -> Dict[Union[float, str], Dict[str, Union[float, str]]]:
"""Проверяет наличие всех ключей в ответе API биржы."""
if not isinstance(response, list):
raise TypeError(
f'API response error: response = {response}'
)
positions = {}
for item in response:
if not isinstance(item, dict):
raise TypeError(
'API response error, '
f'response = {response}'
)
symbol = item.get('symbol')
if symbol is None:
raise APIResponseError(
'API response error, '
f'response = {response}'
)
amount = item.get('positionAmt')
if amount is None:
raise APIResponseError(
'API response error, '
f'response = {response}'
)
entry_price = item.get('entryPrice')
if entry_price is None:
raise APIResponseError(
'API response error, '
f'response = {response}'
)
positions[symbol] = {'amount': amount, 'entry_price': entry_price}
return positions
def rsi(data: DataTransferObject, period) -> DataTransferObject:
"""Добавляет значения индикатора rsi в объект данных."""
try:
logging.info('Calculation of the value of indicators')
df = pd.DataFrame(data['close'])
rsi = ta.rsi(df[0], length=period).round(2)
data['indicators'] = {'rsi': list(rsi)}
except Exception as exc:
raise IndicatorDataError(f'Indicator data conversion error: {exc}') from exc
else:
return data
def transaction_decision(exchange: Exchange, data: DataTransferObject, position: float) -> None:
"""Основная логика для выставления заявки."""
current_value = data['indicators']['rsi'][-1]
previous_value = data['indicators']['rsi'][-2]
buy = (current_value > RSI_LOWER >= previous_value)
sell = (current_value < RSI_UPPER <= previous_value)
if buy and position <= 0:
new_order(exchange, 'BUY', position)
if sell and position >= 0:
new_order(exchange, 'SELL', position)
def new_order(exchange: Exchange, direction: str, amount: float) -> None:
"""Рассчитывает объем и выставляет рыночную заявку."""
diff_amount = abs(float(amount)) + ORDER_SIZE
print(f'Submitting a market order: {direction}, размер заявки {diff_amount}')
exchange.create_market_order(direction, amount=diff_amount)
telegram.send_message(f'*** Submitting a market order: {direction}, size {diff_amount} ***')
def send_message(message: str) -> None:
"""Отправляет сообщение в телеграм."""
response = telegram.send_message(message)
if response.status_code != HTTPStatus.OK:
raise APIStatusCodeError(
'Invalid server response: '
f'http code = {response.status_code}; '
f'reason = {response.reason}; '
f'content = {response.text}'
)
def main():
if not (telegram.check_tokens() and exchange.check_tokens()):
error_message = (
'Missing required environment variables:\n'
'BINANCE_API_KEY, BINANCE_PRIVATE_KEY, BINANCE_MARKET_TYPE, '
'TELEGRAM_CHAT_ID, TELEGRAM_ENDPOINT, TELEGRAM_TOKEN.\n'
'The program was stopped'
)
telegram.send_message(error_message)
logging.critical(error_message)
sys.exit(error_message)
current_position = None
prev_position = current_position
while True:
try:
logging.info(f'Position request {DATA_MARKET_ID}')
response = exchange.get_position()
except exchange.error as exc:
raise ExchangeError(f'Getting position error: {exc}') from exc
try:
logging.info('Checking the API Response')
position = check_position_response(response)[DATA_MARKET_ID]
pos = float(position['amount'])
current_position = TELEGRAM_MESSAGE.format(
symbol=DATA_MARKET_ID,
amount=position['amount'],
entry_price=position['entry_price'],
direction=('LONG' if pos > 0 else 'SHORT')
)
if current_position == prev_position:
logging.debug(
f'No position updates for the {DATA_MARKET_ID}'
)
except Exception as exc:
error_message = f'Program crash: {exc}'
current_position = error_message
logging.exception(error_message)
try:
if current_position != prev_position:
telegram.send_message(current_position)
prev_position = current_position
except TelegramError as exc:
error_message = f'Program crash: {exc}'
logging.exception(error_message)
try:
logging.info(f'Request data for a {DATA_MARKET_ID}')
data = exchange.get_data()
except exchange.error as exc:
raise DataError(f'Getting data error: {exc}') from exc
indicator_data = rsi(data, RSI_PERIOD)
transaction_decision(exchange, indicator_data, pos)
time.sleep(DATA_RETRY_TIME)
os.system('cls||clear')
if __name__ == '__main__':
log_format = (
'%(asctime)s [%(levelname)s] - '
'(%(filename)s).%(funcName)s:%(lineno)d - %(message)s'
)
log_file = os.path.join(BASE_DIR, 'output.log')
log_stream = sys.stdout
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(log_stream)
]
)
main()