-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsteam_groups.py
179 lines (130 loc) · 4.75 KB
/
steam_groups.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import json
import os
import pathlib
import time
import requests
def get_data_folder():
return 'data/'
def get_api_key_filename():
return get_data_folder() + 'api_key.txt'
def get_member_list_filename():
return get_data_folder() + 'members_list.txt'
def get_api_key_environment_variable():
# NB: this is mostly useful for unit tests with continuous integration. The API key cannot be stored in a text file,
# and has to be stored as an environement variable.
#
# Caveat: you have to fill in the name chosen for your "repository secret" on Github!
return 'API_KEY'
def load_api_key():
api_key_filename = get_api_key_filename()
try:
with open(api_key_filename) as f:
data = f.readlines()
api_key = data[0]
except FileNotFoundError:
print('The file containing your private API key could not be found.')
env_var = get_api_key_environment_variable()
# Reference: https://stackoverflow.com/a/4907053/376454
api_key = os.getenv(key=env_var, default=None)
if api_key is None:
print(
'An environement variable with your private API key could not be found. Queries are bound to fail.',
)
else:
print(
'Your private API key was found in an environment variable ({}).'.format(
env_var,
),
)
return api_key
def load_member_list():
member_list_filename = get_member_list_filename()
with open(member_list_filename) as f:
data = f.readlines()
member_list = [int(steam_id.strip()) for steam_id in data]
return member_list
def get_library_folder(include_free_games=True):
if include_free_games:
data_path = get_data_folder() + 'library_with_f2p/'
else:
data_path = get_data_folder() + 'library/'
pathlib.Path(data_path).mkdir(parents=True, exist_ok=True)
return data_path
def get_steam_api_library_url():
return 'https://api.steampowered.com/IPlayerService/GetOwnedGames/v1/'
def download_user_data(
steam_id,
output_folder,
steam_api_url,
query_count=0,
include_free_games=True,
):
rate_limits = get_steam_api_rate_limits()
library_filename = output_folder + str(steam_id) + '.json'
try:
with open(library_filename, encoding="utf8") as in_json_file:
data_as_json = json.load(in_json_file)
print(f'Loading data from cache for Steam-ID {steam_id}')
except FileNotFoundError:
if query_count >= rate_limits['max_num_queries']:
cooldown_duration = rate_limits['cooldown']
print(
'Number of queries {} reached. Cooldown: {} seconds'.format(
query_count,
cooldown_duration,
),
)
time.sleep(cooldown_duration)
query_count = 0
print(f"Downloading and caching data for Steam-ID {steam_id}")
data_request = {}
api_key = load_api_key()
if api_key is not None:
data_request['key'] = api_key
data_request['steamid'] = steam_id
if include_free_games:
data_request['include_played_free_games'] = 1
else:
data_request['include_played_free_games'] = 0
response = requests.get(steam_api_url, params=data_request)
data_as_json = response.json()
query_count += 1
with open(library_filename, 'w', encoding="utf8") as cache_json_file:
# Enforce double-quotes instead of single-quotes. Reference: https://stackoverflow.com/a/8710579/
data_as_str = json.dumps(data_as_json)
print(data_as_str, file=cache_json_file)
return data_as_json, query_count
def download_user_library(steam_id, query_count=0, include_free_games=True):
output_folder = get_library_folder(include_free_games)
steam_api_url = get_steam_api_library_url()
library_data, query_count = download_user_data(
steam_id,
output_folder,
steam_api_url,
query_count,
include_free_games,
)
return library_data, query_count
def get_steam_api_rate_limits():
# Objective: return the rate limits of Steam API.
rate_limits = {
'max_num_queries': 150,
'cooldown': (5 * 60) + 10, # 5 minutes plus a cushion
}
return rate_limits
def batch_download(include_free_games=True):
member_list = load_member_list()
query_count = 0
for steam_id in member_list:
_, query_count = download_user_library(
steam_id,
query_count,
include_free_games,
)
return
def main():
include_free_games = True
batch_download(include_free_games)
return True
if __name__ == '__main__':
main()