4
4
from functools import cached_property
5
5
from gzip import GzipFile
6
6
from urllib .request import urlopen
7
+ from bisect import bisect_left
7
8
8
9
9
10
class Score (object ):
@@ -17,7 +18,21 @@ def __init__(self, cve: str, epss: str, percentile: str):
17
18
18
19
class EPSS (object ):
19
20
def __init__ (self ) -> None :
20
- pass
21
+
22
+ url = 'https://epss.cyentia.com/epss_scores-current.csv.gz'
23
+
24
+ with urlopen (url ) as res :
25
+ dec = GzipFile (fileobj = res )
26
+ epss_scores_str : str = dec .read ().decode ("utf-8" )
27
+ epss_scores_list = epss_scores_str .split ('\n ' )
28
+
29
+ self ._download = epss_scores_list
30
+
31
+ scores = [row for row in csv .DictReader (self ._download [1 :])]
32
+
33
+ self ._byCVE = {row ['cve' ] : Score (row ['cve' ], row ['epss' ], row ['percentile' ]) for row in scores }
34
+
35
+ self ._sortedScores = sorted (self ._byCVE .values (),key = lambda x :x .percentile )
21
36
22
37
def scores (self ) -> list [Score ]:
23
38
"""Get all CVE's EPSS scores (downloaded data is cached in memory)
@@ -33,9 +48,7 @@ def scores(self) -> list[Score]:
33
48
Returns:
34
49
list[Score]: EPSS score's csv list
35
50
"""
36
- scores = [row for row in csv .DictReader (self ._download [1 :])]
37
- return [Score (row ['cve' ], row ['epss' ], row ['percentile' ])
38
- for row in scores ]
51
+ return list (self ._sortedScores )
39
52
40
53
def score (self , cve_id : str ) -> Score :
41
54
"""Get EPSS score and percentile
@@ -49,11 +62,8 @@ def score(self, cve_id: str) -> Score:
49
62
Returns:
50
63
Score | None: EPSS score percentile
51
64
"""
52
- rows = self ._filter_by_cve_id (cve_id )
53
- if len (rows ) == 1 :
54
- return rows [0 ]
55
- else :
56
- return None
65
+
66
+ return self ._byCVE .get (cve_id ,None )
57
67
58
68
def epss (self , cve_id : str ) -> float :
59
69
"""Get EPSS score
@@ -64,11 +74,12 @@ def epss(self, cve_id: str) -> float:
64
74
Returns:
65
75
float | None: EPSS score (0.0-1.0)
66
76
"""
67
- rows = self ._filter_by_cve_id (cve_id )
68
- if len (rows ) == 1 :
69
- return rows [0 ].epss
70
- else :
77
+
78
+ score = self ._byCVE .get (cve_id ,None )
79
+ if score is None :
71
80
return None
81
+ else :
82
+ return score .epss
72
83
73
84
def percentile (self , cve_id : str ) -> float :
74
85
"""Get EPSS percentile
@@ -79,11 +90,11 @@ def percentile(self, cve_id: str) -> float:
79
90
Returns:
80
91
float | None: EPSS percentile (0.0-1.0)
81
92
"""
82
- rows = self ._filter_by_cve_id (cve_id )
83
- if len (rows ) == 1 :
84
- return rows [0 ].percentile
85
- else :
93
+ score = self ._byCVE .get (cve_id ,None )
94
+ if score is None :
86
95
return None
96
+ else :
97
+ return score .percentile
87
98
88
99
def epss_gt (self , max : float ) -> list [Score ]:
89
100
"""Get CVEs with EPSS score greater or equal than the parameter
@@ -94,8 +105,9 @@ def epss_gt(self, max: float) -> list[Score]:
94
105
Returns:
95
106
list[Score] | None: EPSS score object list
96
107
"""
97
- rows = [r for r in filter (lambda x : x .epss >= max , self .scores ())]
98
- return rows
108
+ i = bisect_left (self ._sortedScores ,min ,key = lambda x :x .epss )
109
+
110
+ return list (self ._sortedScores [i :])
99
111
100
112
def percentile_gt (self , max : float ) -> list [Score ]:
101
113
"""Get CVEs with percentile greater or equal than the parameter
@@ -106,9 +118,9 @@ def percentile_gt(self, max: float) -> list[Score]:
106
118
Returns:
107
119
list[Score] | None: EPSS score object list
108
120
"""
109
- rows = [ r for r in
110
- filter ( lambda x : x . percentile >= max , self . scores ())]
111
- return rows
121
+ i = bisect_left ( self . _sortedScores , min , key = lambda x : x . percentile )
122
+
123
+ return list ( self . _sortedScores [ i :])
112
124
113
125
def epss_lt (self , min : float ) -> list [Score ]:
114
126
"""Get CVEs with EPSS score lower or equal than the parameter
@@ -119,8 +131,9 @@ def epss_lt(self, min: float) -> list[Score]:
119
131
Returns:
120
132
list[Score] | None: EPSS score object list
121
133
"""
122
- rows = [r for r in filter (lambda x : x .epss <= min , self .scores ())]
123
- return rows
134
+ i = bisect_left (self ._sortedScores [::- 1 ],min ,key = lambda x :1 - x .epss )
135
+
136
+ return list (self ._sortedScores [:len (self .sortedScores )- i ])
124
137
125
138
def percentile_lt (self , min : float ) -> list [Score ]:
126
139
"""Get CVEs with percentile lower or equal than the parameter
@@ -131,9 +144,9 @@ def percentile_lt(self, min: float) -> list[Score]:
131
144
Returns:
132
145
list[Score] | None: EPSS score object list
133
146
"""
134
- rows = [ r for r in
135
- filter ( lambda x : x . percentile <= min , self . scores ())]
136
- return rows
147
+ i = bisect_left ( self . _sortedScores [:: - 1 ], min , key = lambda x : 1 - x . percentile )
148
+
149
+ return list ( self . _sortedScores [: len ( self . sortedScores ) - i ])
137
150
138
151
def csv (self ) -> list [str ]:
139
152
"""Get csv data containing all epss scores.
@@ -150,18 +163,3 @@ def csv(self) -> list[str]:
150
163
"""
151
164
return self ._download
152
165
153
- def _filter_by_cve_id (self , cve_id : str ):
154
- cve_filter = filter (lambda x : x .cve == cve_id , self .scores ())
155
- rows = [row for row in cve_filter ]
156
- return rows
157
-
158
- @cached_property
159
- def _download (self ):
160
- url = 'https://epss.cyentia.com/epss_scores-current.csv.gz'
161
-
162
- with urlopen (url ) as res :
163
- dec = GzipFile (fileobj = res )
164
- epss_scores_str : str = dec .read ().decode ("utf-8" )
165
- epss_scores_list = epss_scores_str .split ('\n ' )
166
-
167
- return epss_scores_list
0 commit comments