1
1
import logging
2
+ from abc import ABC , abstractmethod
2
3
from collections import defaultdict
3
4
from datetime import datetime
4
5
10
11
11
12
logger = logging .getLogger (__name__ )
12
13
13
- alerts_cache = defaultdict (dict )
14
+ alerts_cache = defaultdict (lambda : defaultdict ( dict ) )
14
15
15
16
TASK_LOCK = "MARKET-SENTIMENT-LOCK"
16
17
17
18
19
+ class TaskConfig (ABC ):
20
+ @property
21
+ @abstractmethod
22
+ def increase_in_percentage (self ) -> float :
23
+ pass
24
+
25
+ @property
26
+ @abstractmethod
27
+ def frequency_in_minutes (self ) -> int :
28
+ pass
29
+
30
+ def __str__ (self ) -> str :
31
+ return self .__class__ .__name__
32
+
33
+
34
+ class EachSecondTaskConfig (TaskConfig ):
35
+ increase_in_percentage = 2.0
36
+ frequency_in_minutes = 1
37
+
38
+
39
+ class Each10SecondsTaskConfig (TaskConfig ):
40
+ increase_in_percentage = 4.0
41
+ frequency_in_minutes = 10
42
+
43
+
18
44
def _get_message (
45
+ key : str ,
19
46
coin : str ,
20
47
currency : str ,
21
48
increase_in_percentage : float ,
22
49
frequency_in_minutes : int ,
23
50
* ,
24
51
indicator : dict [str , int | float ],
25
52
) -> str | None :
26
- key , value = next (iter (indicator .items ()))
53
+ indicator_key , indicator_value = next (iter (indicator .items ()))
27
54
28
- alerts_cache_coin = alerts_cache [coin ]
55
+ alerts_cache_coin = alerts_cache [key ][ coin ]
29
56
30
57
if alerts_cache_coin ["currency" ] != currency :
31
58
logger .info (f"Mismatch in Currency: { alerts_cache_coin ['currency' ]} != { currency } for { coin } ." )
32
59
return None
33
60
34
- ratio = ((value / alerts_cache_coin [key ]) - 1 ) * 100
61
+ ratio = ((indicator_value / alerts_cache_coin [indicator_key ]) - 1 ) * 100
35
62
ratio_abs = abs (ratio )
36
63
37
64
if ratio_abs < increase_in_percentage :
38
65
logger .info (
39
- f"No new { key .upper ()} alerts for coin:{ coin } ; frequency_in_minutes:{ frequency_in_minutes } ratio: { ratio } ."
66
+ f"No new { indicator_key .upper ()} alerts for coin:{ coin } ; frequency_in_minutes:{ frequency_in_minutes } ratio: { ratio } ."
40
67
)
41
68
return None
42
69
43
70
verb = "increased" if ratio > 0 else "decreased"
44
- return f"The { key .upper ()} { verb } { ratio_abs :.2f} % in the last { frequency_in_minutes } minute(s)"
71
+ return f"The { indicator_key .upper ()} { verb } { ratio_abs :.2f} % in the last { frequency_in_minutes } minute(s)"
45
72
46
73
47
74
@huey .periodic_task (crontab (minute = "*/1" ))
48
75
@huey .lock_task (TASK_LOCK )
49
76
def bullish_or_bearish__1_min ():
50
- task (increase_in_percentage = 1.0 , frequency_in_minutes = 1 )
77
+ task (config = EachSecondTaskConfig () )
51
78
52
79
53
80
@huey .periodic_task (crontab (minute = "*/10" ))
54
81
@huey .lock_task (TASK_LOCK )
55
82
def bullish_or_bearish__10_min ():
56
- task (increase_in_percentage = 4.0 , frequency_in_minutes = 10 )
83
+ task (config = Each10SecondsTaskConfig () )
57
84
58
85
59
- def task (* , increase_in_percentage : float , frequency_in_minutes : int ):
86
+ def task (* , config : TaskConfig ):
60
87
name = datetime .today ().date ().strftime ("%Y-%m-%d" )
61
88
62
89
with redis .pipeline () as pipe :
@@ -91,7 +118,9 @@ def task(*, increase_in_percentage: float, frequency_in_minutes: int):
91
118
logger .error (f"Error when parsing { coin } dataset: { error } " )
92
119
break
93
120
else :
94
- if coin in alerts_cache :
121
+ key = f"{ name } { config } "
122
+
123
+ if key in alerts_cache and coin in alerts_cache [key ]:
95
124
96
125
messages = []
97
126
@@ -100,7 +129,12 @@ def task(*, increase_in_percentage: float, frequency_in_minutes: int):
100
129
{"price" : price },
101
130
):
102
131
message = _get_message (
103
- coin , currency , increase_in_percentage , frequency_in_minutes , indicator = indicator
132
+ key ,
133
+ coin ,
134
+ currency ,
135
+ config .increase_in_percentage ,
136
+ config .frequency_in_minutes ,
137
+ indicator = indicator ,
104
138
)
105
139
if message :
106
140
logger .info (message )
@@ -110,6 +144,7 @@ def task(*, increase_in_percentage: float, frequency_in_minutes: int):
110
144
111
145
for message in messages :
112
146
Alerts (
147
+ name = f"bullish-or-bearish-{ config .frequency_in_minutes } " ,
113
148
time_ts = time_ts ,
114
149
trade_symbol = coin ,
115
150
price = price ,
@@ -120,7 +155,7 @@ def task(*, increase_in_percentage: float, frequency_in_minutes: int):
120
155
else :
121
156
logger .info ("Initializing Alerts..." )
122
157
123
- alerts_cache [coin ] = {
158
+ alerts_cache [key ][ coin ] = {
124
159
"volume" : volume ,
125
160
"volume_buy_orders" : volume_buy_orders ,
126
161
"volume_sell_orders" : volume_sell_orders ,
0 commit comments