-
Notifications
You must be signed in to change notification settings - Fork 1
/
__init__.py
341 lines (272 loc) · 11.6 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# filterms.py
# authored by krunch3r76 (https://www.github.com/krunch3r76)
# license GPL 3.0
import yapapi
import os, sys # debug sys
from yapapi import rest
from typing import Optional
from yapapi.strategy import (
SCORE_REJECTED,
SCORE_NEUTRAL,
SCORE_TRUSTED,
MarketStrategy,
DecreaseScoreForUnconfirmedAgreement,
LeastExpensiveLinearPayuMS,
WrappingMarketStrategy,
)
from yapapi.props import com
from decimal import Decimal
import json
from .provider_filter import ProviderFilter
import inspect
import logging
from collections.abc import Iterable
def _print_err(*args, **kwargs):
"""wrapper around print to route to stderr"""
kwargs["file"] = sys.stderr
print(*args, **kwargs)
# originally a method from yapapi.golem::Golem (yapapi 0.9)
# creates sane defaults for a marketstrategy
def _initialize_default_strategy() -> DecreaseScoreForUnconfirmedAgreement:
"""Create a default strategy and register it's event consumer"""
base_strategy = LeastExpensiveLinearPayuMS(
max_fixed_price=Decimal("1.0"),
max_price_for={
com.Counter.CPU: Decimal("0.2"),
com.Counter.TIME: Decimal("0.1"),
},
)
strategy = DecreaseScoreForUnconfirmedAgreement(base_strategy, 0.5)
# self._event_consumers.append(strategy.on_event)
return strategy
class _ProviderInfo:
"""store name@provider_id as hashable along with other relevant offer info"""
def __init__(self, name, provider_id, cpu_capabilities):
self.__name = name
self.__provider_id = provider_id
self.__cpu_capabilities = cpu_capabilities
@property
def name(self):
return self.__name
@property
def provider_id(self):
return self.__provider_id
@property
def cpu_capabilities(self):
return self.__cpu_capabilities
def check_cpu_capabilities(self, features):
"""ensure all capabilities are present on the offer"""
assert isinstance(features, Iterable)
if len(features) == 0:
return True
return all(map(lambda feature: feature in self.cpu_capabilities, features))
def __repr__(self):
return f"{self.name}@{self.provider_id}"
def __hash__(self):
return hash(repr(self))
def __eq__(self, other):
return self.__hash__() == other.__hash__()
def fuzzy_matches(self, name_or_partial_id):
"""determine if either name or partial address corresponds to internals"""
whether_name_matches = False
whether_partial_id_matches = False
if name_or_partial_id == self.name:
whether_name_matches = True
if self.provider_id.startswith(name_or_partial_id):
whether_partial_id_matches = True
return whether_name_matches ^ whether_partial_id_matches
class FilterProviderMS(ProviderFilter):
"""
:_VERBOSE verbose logging (determined by presence of env:FILTERMSVERBOSE
:_previously_rejected a set of previously rejected providers
:_seen_providers a set of all unique providers seen thus far
"""
# __init__ <
def __init__(self, base_strategy=None, features=None):
# - _convert_string_array_to_list -<
def _convert_string_array_to_list(stringarray):
"""take a string that represents a single value or a bracketed array
and return enclosed in a python list
inputs 1) a stringarray bounded by [ ] with unquoted list elements and
converts to a list of strings
or 2) an unbounded string that is a single word which is placed as a string in a list
then returns the list or an empty list
"""
error = False
done = False
thelist = []
if stringarray == None:
error = True
if not error and not isinstance(stringarray, str):
error = True
if not error and not done:
if len(stringarray) == 0:
error = True
if not error and not done:
if stringarray[0] != "[":
thelist.append(stringarray)
done = True
if not error and not done:
if len(stringarray) < 3:
error = True # a input bracketed string must have at least one element (character) to listify
if not error and not done: # not done implies begins with '['
if stringarray[-1] == "]":
thelist.append(stringarray)
else:
error = True
if not error and not done:
thelist = stringarray[1:-1].split(",")
return thelist if not error else []
# /- _convert_string_array_to_list ->
def setup_logger(self):
self._logger = logging.getLogger("filterms")
stream_handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter("[filterms] %(levelname)s - %(message)s")
stream_handler.setFormatter(formatter)
self._logger.addHandler(stream_handler)
self._logger.setLevel(logging.INFO)
if self._VERBOSE:
self._logger.setLevel(logging.DEBUG)
if not self._VERBOSE:
self._logger.info(
f"TO SEE ALL REJECTIONS SET THE ENVIRONMENT VARIABLE FILTERMSVERBOSE TO 1",
)
self._VERBOSE = os.environ.get("FILTERMSVERBOSE")
self._provider_fuzzy_bl = _convert_string_array_to_list(
os.environ.get("GNPROVIDER_BL")
)
self._provider_fuzzy_wl = _convert_string_array_to_list(
os.environ.get("GNPROVIDER")
)
setup_logger(self)
self._logger.debug(f"fuzzy whitelist is {self._provider_fuzzy_wl}")
if features == None:
features = _convert_string_array_to_list(os.environ.get("GNFEATURES"))
elif isinstance(features, str):
features = [features]
elif not isinstance(features, Iterable):
self._logger.warning(
"Features argument is neither a string nor an iterable: IGNORED!"
)
features = list()
self._features = features
if base_strategy == None:
base_strategy = _initialize_default_strategy()
super().__init__(base_strategy, is_allowed=self._is_allowed)
self._previously_rejected = set()
self._seen_providers = set()
# print(
# f"[filterms] TO SEE ALL REJECTIONS SET THE ENVIRONMENT VARIABLE FILTERMSVERBOSE TO 1",
# file=sys.stderr,
# )
self._providerInfo_bl = set()
self._providerInfo_wl = set()
self._providersSeenSoFar = set()
self._providersBlacklistedSoFar = set()
self._logger.debug(f"filtering providers with cpu features in {self._features}")
async def _is_allowed(self, provider_id):
try:
def _lookup_provider_by_id(self, provider_id):
"""find matching providerInfo"""
matched = list(
filter(
lambda providerInfo: provider_id == providerInfo.provider_id,
self._providersSeenSoFar,
)
)
if len(matched) < 1: # should be 1...
emsg = "fatal error! provider_id not stored internally!"
self._logger.critical(emsg)
# raise Exception(emsg)
return matched[0]
matched_on_blacklist = False
matched_on_whitelist = True
matched_on_features = False
# matched_on_secondary_criteria = True # TODO
allowed = False
providerInfo = _lookup_provider_by_id(self, provider_id)
# if providerInfo in self._providersBlacklistedSoFar:
# self._logger.debug("UNEXPECTED REPEAT OFFER!")
# return False
# check blacklist
matching_bl = list(
filter(
lambda providerInfo: provider_id == providerInfo.provider_id,
self._providerInfo_bl,
)
)
matched_on_blacklist = len(matching_bl) == 1
if matched_on_blacklist:
self._logger.debug(
f"{providerInfo} rejected due to blacklist membership"
)
elif len(self._provider_fuzzy_wl) > 0: # whitelisting activated
if len(self._providerInfo_wl) > 0:
# check whitelist
matched_on_whitelist = False # ensure not on list rejected
matching_wl = list(
filter(
lambda providerInfo: provider_id
== providerInfo.provider_id,
self._providerInfo_wl,
)
)
matched_on_whitelist = len(matching_wl) > 0
else:
matched_on_whitelist = (
False # whitelisting requested but not a match
)
self._logger.debug(f"rejected {providerInfo}, not on whitelist")
matched_on_features = providerInfo.check_cpu_capabilities(self._features)
if not matched_on_features:
self._logger.debug(
f"{providerInfo} rejected due to missing cpu feature(s)"
)
# self._logger.debug(
# f"the following provider did not have the required feature(s):"
# f" {providerInfo}"
# f"\n{providerInfo.cpu_capabilities}"
# )
except Exception as e:
self._logger.critical(f"_is_allowed threw an unhandled exception: {e}")
raise e
allowed = (
(not matched_on_blacklist)
and matched_on_whitelist
and matched_on_features
# and matched_on_secondary_criteria
)
if not allowed:
self._providersBlacklistedSoFar.add(provider_id)
return allowed
# / __init__ >
async def score_offer(self, offer) -> float:
def _extract_provider_info_from_offer(offer):
return offer.props["golem.node.id.name"], offer.issuer
try:
providerInfo = _ProviderInfo(
*_extract_provider_info_from_offer(offer),
offer.props.get(
"golem.inf.cpu.capabilities", []
), # kludge to handle missing field
)
name = _extract_provider_info_from_offer(offer)
self._providersSeenSoFar.add(providerInfo)
if any(
map(
lambda candidate: providerInfo.fuzzy_matches(candidate),
self._provider_fuzzy_bl,
)
):
# add to internal set of blacklisted providers
self._providerInfo_bl.add(providerInfo)
elif any(
map(
lambda candidate: providerInfo.fuzzy_matches(candidate),
self._provider_fuzzy_wl,
)
):
self._providerInfo_wl.add(providerInfo)
except Exception as e:
self._logger.critical(f"an unhandled exception: {e}, occurred")
return await super().score_offer(offer)