-
Notifications
You must be signed in to change notification settings - Fork 1
/
experiments.py
467 lines (392 loc) · 18.1 KB
/
experiments.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
"""All Puffer experiments.
This file defines experiments in the form:
```
name = "puffer"
defintions = {
"<path>": function(config)
}
```
That is, each experiment is identified by the path where its outputs are
stored, and is a function that takes a (user) config as argument.
When running these experiments with the `experiment_helpers`,
outputs are stored in `config.output_directory / puffer / <path>`.
Implementation note:
Below, we use functools.partial to prepare the functions. It allows to
partially set some function arguments (hence the name), so we can easily
parametrize one experiment function in different ways to define a range
of experiments.
"""
# pylint: disable=invalid-name
from functools import partial
from typing import Callable, Dict
import pandas as pd
from experiment_helpers.framework import ParametrizedExperiments
from . import config
from .implementation import (analysis, data, deployment, memory, replay,
selection_analysis)
replay_group = "puffer-replay"
replay_exps: Dict[str, Callable] = {}
# Memory size of 1M, like puffer.
memsize = 1000000
# Limit how many samples we process at once.
# The majority of days has less then 2M samples, so most days we can process
# in one go. However, there are a few days with a _lot_ of samples (up to 9M).
# While not an issue per-se, this causes Memento to need a lot of memory.
# As other people also need to server, we can limit how much we process :D
max_insert = 2000000
# Default settings we use for Puffer Memento.
default_memento = partial(
memory.PufferMemory,
size=memsize, batching_size=256,
bw=0.1, temperature=0.01,
insert_chunksize=max_insert,
random_fraction=0.0, random_forget=0.0,
)
default_alternatives = {
"confidence":
partial(memory.PufferConfidence, size=memsize, temperature=0.1),
"loss":
partial(memory.PufferLoss, size=memsize, temperature=0.1),
"loss_batched":
partial(memory.PufferLoss, size=memsize, temperature=0.1,
batching_size=256),
"classcounts":
partial(memory.PufferClassCounts, size=memsize, temperature=0.01),
"stalls":
partial(memory.PufferStalled, size=memsize, temperature=0.01),
}
# Default retraining threshold.
default_threshold = 0.1 # 10%
_today = pd.to_datetime("today").strftime("%Y-%m-%d")
data_first_and_last = {
# Use data from 2021 for experiments.
'comparison': ("2021-01-01", "2022-06-01"),
# Evaluate long-term results until cutoff before submission.
'eval': ("2020-04-09", "2023-08-28"),
# Current memento deployment, including latest data.
'deployment': ("2022-05-01", _today),
}
data_ranges = {
key: pd.date_range(start_day, end_day, freq='D')
for key, (start_day, end_day) in data_first_and_last.items()
}
# Data download.
# ==============
# We put them in a different group of commands, because the downloads need
# to be run independently of the other experiments.
# Running both at the same time can lead to experiments being run with
# incomplete data.
download_group = "puffer-download"
download_exps: Dict[str, Callable] = {}
# Note: The path here does not matter for outputs, as download stores the
# data into the `puffer_data_directory` specified in the config.
# Always make sure we have data for Fugu-Feb downloaded!
download_exps['fugufeb'] = partial(data.download_data, day="fugufeb")
for days in data_ranges.values():
for day in days:
download_exps[f"{day.strftime('%Y-%m-%d')}"] = partial(
data.download_data, day=day)
# Data preprocessing.
# ===================
# We need a lot of data in the correct format for the TTP models.
# We can preprocess them to speed up processing at the expense of disk space.
preprocess_group = "puffer-preprocess"
preprocess_exps: Dict[str, Callable] = {}
for days in data_ranges.values():
for day in days:
preprocess_exps[f"{day.strftime('%Y-%m-%d')}"] = partial(
data.preprocess_data, day=day)
# Data analysis.
# ==============
analysis_group = "puffer-analysis"
analysis_exps: Dict[str, Callable] = {}
for day in data_ranges['eval']:
analysis_exps[f"{day.strftime('%Y-%m-%d')}"] = partial(
analysis.analyze_day, day=day)
# Comparison of different parameters.
# ===================================
# Start and end day of comparison runs.
# Each half a year. The first day is only used for training, the last day
# only for eval, so it's ok if runs overlap on that.
comparison_runs = [
("2021-01-01", "2021-07-01"),
("2021-07-01", "2022-01-01"),
("2022-01-01", "2022-06-01"),
]
for start, end in comparison_runs:
run = f"{start}_{end}"
# Data stats like streamtime for the comparison run.
# --------------------------------------------------
replay_exps[f'stats/{run}'] = partial(
replay.data_overview, startday=start, endday=end)
# pylint: disable=dangerous-default-value
# (we only read from the dict).
def _parametrize_replay(config_overrides=None, replay_kwargs=None,
memcls=None, **mem_kwargs):
"""The replay function takes a lot of parameters.
This function is a helper to parametrize it.
"""
if memcls is None:
memcls = partial(default_memento, **mem_kwargs)
else:
memcls = partial(memcls, **mem_kwargs)
# By default, we retrain once per week.
# We need to retrain to access the memory quality, but retraining
# daily is unnecessarily expensive to compute.
_rkws = dict(
retrain_from="fugu_feb", # Speed up training.
train_threshold=0.0, train_interval=7
)
if replay_kwargs:
_rkws.update(replay_kwargs)
replaycls = partial(replay.PufferDataReplay, **_rkws)
return partial(
replay.evaluate,
startday=start, endday=end, # pylint: disable=cell-var-from-loop
replaycls=replaycls, memorycls=memcls,
config_overrides=config_overrides,
)
# Default parameters as baseline.
# -------------------------------
# Default parameters.
replay_exps[f'comparison/baseline/default/{run}'] = _parametrize_replay()
replay_exps[f'comparison/baseline/random/{run}'] = _parametrize_replay(
random_fraction=1.0, # totally random mem.
random_forget=0.1, # with same forgetting rate as puffer.
)
# Memento (T=0), i.e. no noise rejection. Only keep lowest density.
replay_exps[f'comparison/baseline/deterministic/{run}'] = \
_parametrize_replay(temperature=0.0)
# Parameter sweeps.
# -----------------
# Different temperature settings for random scaling.
for temp in [1e-5, 1e-4, 1e-3, 5e-3, 1e-2, 5e-2, 1e-1, 1, 1e1, 1e2]:
replay_exps[f'comparison/temperature/{temp:.0e}/{run}'] = \
_parametrize_replay(temperature=temp)
# Different bandwidths.
for bw in [0.001, 0.01, 0.05, 0.08, 0.1, 0.125, 0.2, 0.5, 1, 10]:
replay_exps[f'comparison/bandwidth/{bw:g}/{run}'] = \
_parametrize_replay(bw=bw)
# Different batching sizes (not too small or it takes forever).
for bsize in [128, 256, 512, 1024, 2048]:
replay_exps[f'comparison/batchsize/{bsize}/{run}'] = \
_parametrize_replay(batching_size=bsize)
# Not used: this is still incredibly slow; does not match batching.
# Also compare to subsampling the memory.
# Batches reduces size by n**2, subsampling by n, so adjust.
# Only for default batchsize, because this is unoptimized and slow.
# if bsize == 256:
# replay_exps[f'comparison/distancesampling/{bsize}/{run}'] = \
# _parametrize_replay(
# batching_size=1, # No batching, sample instead.
# distance_sample_fraction=1.0/bsize**2
# )
# Using a euclidean distance instead of BBDR + distribution distance.
# in the paper we only show one of both as they perform basically equal.
replay_exps[f'comparison/euclidean/{run}'] = \
_parametrize_replay(distances='euclidean')
replay_exps[f'comparison/euclidean+bbdr/{run}'] = \
_parametrize_replay(distances='euclidean+bbdr')
# Optional: Only consider input or output space.
# Both are beneficial; not shown in paper; uncomment to run.
# for distances in ("both", "input", "output"):
# replay_exps[f'comparison/subspace/{distances}/{run}'] = \
# _parametrize_replay(distances=distances)
# Ablation: compare smart Memory to a larger, but naive, memory.
replay_exps[f'comparison/memsize/double/{run}'] = _parametrize_replay(
size=2*memsize, # double the size.
random_fraction=1.0, # totally random mem.
random_forget=0.1, # with same forgetting rate as puffer.
)
# Test alternative selection methods than density.
# These methods also support different temperatures to control randomness.
altexp = "comparison/metric"
for alt_name in ["loss", "stalls", "confidence", "classcounts"]:
alt_cls = default_alternatives[alt_name]
# For stalls we need to load them; save the overhead otherwise.
replay_kws = dict(load_stalls=True) if alt_name == "stalls" else {}
for temp in [1e-4, 1e-3, 1e-2, 1e-1, 1, 1e1]:
replay_exps[f'{altexp}/{alt_name}/{temp:.0e}/{run}'] = \
_parametrize_replay(memcls=alt_cls, temperature=temp,
replay_kwargs=replay_kws)
# For the loss-based alternative, some additional datapoints are helpful.
for temp in [1e2, 1e3]:
replay_exps[f'{altexp}/loss/{temp:.0e}/{run}'] = \
_parametrize_replay(memcls=default_alternatives['loss'],
temperature=temp)
# compare training decision metrics.
# ----------------------------------
# coverage-based (Memento) vs loss-based.
for decision in ["coverage", "loss"]:
for threshold in [0.05, 0.1, 0.2]:
replay_exps["comparison/train-decision/"
f"{decision}/{threshold:.2f}/{run}"] = \
_parametrize_replay(replay_kwargs=dict(
train_interval=1, # Allow retraining every day.
# But don't automatically retrain -> decide each day.
train_threshold=threshold,
train_metric=decision,
))
# compare with and without JTT for training and MatchMaker for predictions.
# -------------------------------------------------------------------------
# To avoid an combinatorial number of experiments, we only test a few
# a few combinations:
# - Only JTT (what if we only try to improve training instead of samples?)
# - JTT + Memento (what if we do both?)
# - JTT + Memento + Matchmaker (what if we also improve predictions?)
# All other possible experiments are outlined below. Uncomment if you want
# to run them.
# No Upscaling.
# JTT without upscaling is just training from random samples, we can use
# the baseline result for that.
# JTT (no upscaling) & Memento is just default Memento, use that as well.
# JTT (no upscaling) & Memento & MatchMaker: Default Memento + MatchMaker.
replay_exps[
f"combinations/memento-matchmaker/{run}"
] = _parametrize_replay(replay_kwargs=dict(matchmaker_predictors=7))
# Uncomment to test only MatchMaker on its own.
# replay_exps[
# f"combinations/matchmaker/{run}"
# ] = _parametrize_replay(
# random_fraction=1.0, # totally random mem.
# random_forget=0.1, # with same forgetting rate as puffer.
# replay_kwargs=dict(matchmaker_predictors=7),
# )
# With JTT upscaling.
for upscale in [2., 3., 5., 10., 20., 50., 100.]:
# JTT only (better training with same samples).
replay_exps[f"combinations/jtt/{upscale:g}/{run}"] = \
_parametrize_replay(
random_fraction=1.0, # totally random mem.
random_forget=0.1, # with same forgetting rate as puffer.
replay_kwargs=dict(jtt_upscale=upscale),
)
# JTT & Memento
replay_exps[
f"combinations/memento-jtt/{upscale:g}/{run}"
] = _parametrize_replay(replay_kwargs=dict(jtt_upscale=upscale))
# JTT & Memento & MatchMaker
replay_exps[
f"combinations/memento-matchmaker-jtt/{upscale:g}/{run}"
] = _parametrize_replay(
replay_kwargs=dict(jtt_upscale=upscale, matchmaker_predictors=7),
)
# Uncomment to test only JTT & MatchMaker without Memento.
# replay_exps[
# f"combinations/matchmaker-jtt/{upscale:g}/{run}"
# ] = _parametrize_replay(
# random_fraction=1.0, # totally random mem.
# random_forget=0.1, # with same forgetting rate as puffer.
# replay_kwargs=dict(jtt_upscale=upscale, matchmaker_predictors=7),
# )
# JTT uses a first training round to decide what to upscale.
# Alterantively, we could e.g. upscale all sessions that struggle.
# First results were not promising, so it's commented out.
# Uncomment if you want to try it out.
# replay_exps[f"combinations/stalls/{upscale:g}/{run}"] = \
# _parametrize_replay(
# random_fraction=1.0, # totally random mem.
# random_forget=0.1, # with same forgetting rate as puffer.
# replay_kwargs=dict(stall_upscale=upscale),
# )
# Alteratively, try selecting samples via QBC, optionally also using the
# ensemble to predict (MatchMaker).
# Note: Currently both QBC and MatchMaker just keep the last n models so
# we don't need to explicitly synchronize them.
# Test a small committee, a larger committee, and a large plus MatchMaker.
# replay_exps[f"combinations/qbc-small/{run}"] = _parametrize_replay(
# memcls=memory.PufferQBC, size=memsize, committee_size=2)
replay_exps[f"combinations/qbc/{run}"] = _parametrize_replay(
memcls=memory.PufferQBC, size=memsize, committee_size=7)
replay_exps[f"combinations/qbc-matchmaker/{run}"] = _parametrize_replay(
memcls=memory.PufferQBC, size=memsize, committee_size=7,
replay_kwargs=dict(matchmaker_predictors=7))
# Analyze how well future samples are covered by the memory samples.
# ==================================================================
# Collect data over three weeks, then freeze the memory. For the rest of the
# year, only evaluate.
freeze_replayclass = partial(
replay.PufferDataReplay,
retrain_from="fugu_feb", # Speed up training.
train_threshold=0.0,
train_interval=7,
freeze_after=21, # Do not update memory or retrain after iteration 21.
)
freeze_replay = partial(
replay.evaluate, replaycls=freeze_replayclass,
startday="2022-01-01", endday="2022-12-31",
)
replay_exps["freeze/memento/2022"] = partial(
freeze_replay, memorycls=default_memento)
random_mem = partial(default_memento, random_fraction=1, random_forget=0.1)
# Analyze the memory selection in-depth over a month.
# ===================================================
selection_exp = partial(
selection_analysis.analyse_selection,
# Select 5M from the start of 2022.
n_samples=5000000, start="2022-01-01", end="2022-01-31",
batch_memorycls=default_memento,
)
replay_exps['selection/memento'] = partial(
selection_exp, memorycls=default_memento)
# loss _batched_ means: same batching as Memento, but loss as metric.
# loss does not need batching, but we want to ensure that the only difference
# is the selection strategy.
replay_exps['selection/loss_batched'] = partial(
selection_exp, memorycls=default_alternatives['loss_batched'])
# Alternative metrics loss without batching or confidence (max(p)).
# Don't provide additional insights compared to loss. Uncomment to run anyways.
# replay_exps['selection/loss'] = partial(
# selection_exp, memorycls=default_alternatives['loss'])
# replay_exps['selection/confidence'] = partial(
# selection_exp, memorycls=default_alternatives['confidence'])
# Continual learning deployment.
# ==============================
deployment_default = default_memento
# As comparison, also use a Memento variant that does not choose randomly.
deployment_deterministic = partial(default_memento, temperature=0.0)
deployment_threshold = 0.1 # 10%
# Put this into a separate group of experiments.
deploy_group = "puffer-deployment"
deploydefs: Dict[str, Callable] = {}
deployment_start, deployment_end = data_first_and_last['deployment']
for index in range(5): # Each index corresponds to a model horizon.
deploydefs[f'default/{index}'] = partial(
deployment.update, index=index,
first_day=deployment_start, last_day=deployment_end,
memcls=deployment_default, threshold=deployment_threshold,
git_dir="/home/alex/puffer_fugu_variant"
)
deploydefs[f'deterministic/{index}'] = partial(
deployment.update, index=index,
first_day=deployment_start, last_day=deployment_end,
memcls=deployment_deterministic, threshold=deployment_threshold,
git_dir="/home/alex/puffer_fugu_variant_2"
)
# Put the definitions in the utilities for running them and proving a CLI.
# ========================================================================
# We pass the config classes because they contain framework defaults as well.
puffer_download = ParametrizedExperiments(
download_group, download_exps,
configcls=config.PufferExperimentConfigNoGPU,
cli_help="Download Puffer data.",
)
puffer_preprocess = ParametrizedExperiments(
preprocess_group, preprocess_exps,
configcls=config.PufferExperimentConfigNoGPU,
cli_help="Preprocess Puffer data (optional).",
)
puffer_analysis = ParametrizedExperiments(
analysis_group, analysis_exps,
configcls=config.PufferExperimentConfig,
cli_help="Analyze Puffer data and aggregate ABR performance.",
)
puffer_replay = ParametrizedExperiments(
replay_group, replay_exps,
configcls=config.PufferExperimentConfig,
cli_help="Run Puffer data replay.",
)
puffer_deployment = ParametrizedExperiments( # Note: different base config!
deploy_group, deploydefs, configcls=config.PufferDeploymentConfig,
cli_help="Update Puffer deployment of Memento.",
)