-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworkload_simulation.py
297 lines (266 loc) · 13.2 KB
/
workload_simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import copy
import random
import pickle
import math
import os
from operator import itemgetter
import forgetting_curve as fc
from collections import namedtuple
import argparse
import settings
from settings import INDEX_DAY_INVESTIGATION, UNIFORM_CUTOFF, int_or_round_floating_itv, ADJUSTMENT_CUTOFF
# todo: a factor of 2.5 is considered to provide 85% retention rate, this should be changed as parameters
# 200 days =>
SimulationResult = namedtuple("SimulationResult", ("pbt", "r_rate", "w_load", "efficiency", "factor"))
def normalize_with_threshold(date_due, effective_interval, ndays):
"""cut an interval if it is longer then the number of days of the simulation"""
# because there is no need to count days that won't be included
if date_due <= ndays:
return effective_interval
else:
delta = date_due - ndays
return effective_interval - delta
def one_or_zero_if_investigation(nb_days):
"""count only review if its in the investigation period"""
if nb_days > INDEX_DAY_INVESTIGATION:
return 1
else:
return 0
def sim(pb, nsims, onfail, ndays=365, difficulty=settings.difficulty, factor=None):
"""simulate N_SIM times, the journey of a single card during N_DAYS, where the algorithm is set up
such as the probablity of success when a card is shown is pb.
Extracts two mean statistics : the true retention rate and the number of times the card was presented to the user.
"""
# Note: statistics are collected only starting from INDEX_DAY_INVESTIGATION
current_pb_success = pb
if factor is None:
factor = fc.factor_interv(pb, difficulty)
nb_reviews_list = []
n_sim = 0
integral_real_retentions = []
while n_sim < nsims:
if settings.ULTRA_RANDOM_CUTOFF:
#ndays_sim = random.randint(30*2, 30*12*4)
while True:
ndays_sim = int(random.expovariate(1/ndays))
if ndays_sim > 2:
break
elif UNIFORM_CUTOFF:
ndays_sim = random.randint(int(ndays * 50 / 100), int(ndays * 150 / 100))
else:
ndays_sim = ndays
integral_real_retention = 0
if INDEX_DAY_INVESTIGATION == 0:
decay = fc.get_decay(settings.difficulty, 1)
integral_real_retention += fc.real_retention_rate_by_interval(decay, 0, 1)
n_sim += 1
date_due = 1
current_interv = 1
number_of_review = 0
nb_days = 0
interval_thresholded_list = []
while nb_days < ndays_sim:
nb_days += 1
if nb_days == 1:
current_pb_success = settings.difficulty
if nb_days == date_due:
number_of_review += one_or_zero_if_investigation(nb_days)
success = random.random() < current_pb_success
if success:
# the real maths used to schedule used latent_interv, because the effective interval have some constraints
# such as be at least 1 and be a round number, which results in a loss of precision
exact_theoretical_interv = (current_interv * factor)
effective_interval = int_or_round_floating_itv(max(exact_theoretical_interv, 1))
date_due = date_due + effective_interval
interval_thresholded = normalize_with_threshold(date_due, effective_interval, ndays=ndays_sim)
current_pb_success = fc.get_current_pb_success(current_interv, interval_thresholded, difficulty)
else:
# the card is reviewed ASAP
number_of_review += one_or_zero_if_investigation(nb_days)
# anki restarts everything at 0
if onfail == "reset":
effective_interval = 1
date_due = date_due + 1
current_pb_success = settings.difficulty
interval_thresholded = normalize_with_threshold(date_due, effective_interval, ndays=ndays_sim)
elif onfail == "stable":
effective_interval = int_or_round_floating_itv(max(current_interv/math.sqrt(factor), 1))
date_due = date_due + effective_interval
interval_thresholded = normalize_with_threshold(date_due, current_interv, ndays=ndays_sim)
current_pb_success = settings.difficulty
else:
raise Exception("unknown onfail value")
# 1 more review, replaned as previously
# as roundings take place, the future probability of success is not exactly pb, and must be computed
current_interv = effective_interval
if ADJUSTMENT_CUTOFF and date_due > ndays_sim:
# the card is due after the end of the simulation
# to be the most fair possible, we add an artificial workload malus for cards scheduled after
# the simulation, so that, simulations that scheduled the card just after N_DAYS get punished
investigation_delta = (ndays_sim - INDEX_DAY_INVESTIGATION)
offset_delta = date_due - ndays_sim
number_of_review += investigation_delta / (investigation_delta + offset_delta)
# if offset very little (like 1) => nreview almost augmented by one
# if offset as large as investigation period, we add 0.5
if date_due > INDEX_DAY_INVESTIGATION and interval_thresholded != 0:
# if the due date enters the investigation period, we need to collect_statistics
# todo: collect_statistics must be a function called separately
start_integral = max(INDEX_DAY_INVESTIGATION - nb_days, 0)
# if nb_days + interval_thresholded < N_DAYS:
# end_integral = interval_thresholded
# else:
# end_integral = N_DAYS - nb_days
end_integral = interval_thresholded
# more fair to count retention rate until the review that is after N_Days
decay = fc.get_decay(current_pb_success, interval_thresholded)
integral_real_retention += fc.real_retention_rate_by_interval(decay, start_integral, end_integral) * \
(
end_integral - start_integral) # real_retention_rate(current_pb_success) * interval_thresholded
interval_thresholded_list.append(interval_thresholded)
if interval_thresholded_list:
integral_real_retentions.append(integral_real_retention / sum(interval_thresholded_list))
nb_reviews_list.append(number_of_review)
real_retention_mean = sum(integral_real_retentions) / nsims
nrev_mean = (sum(nb_reviews_list) / nsims)
return [real_retention_mean, nrev_mean]
def get_simdata(filepath):
return pickle.load(open(filepath, "rb"))
def remove_irrelevant_options(outputs):
"""remove points for which unambiguous better options exist (both smaller retention and bigger workload)"""
results = outputs[:]
print("lenght before removals " + str(len(results)))
sorted_results = sorted(total_output, key=itemgetter(3), reverse=True)
for r_good in sorted_results[:]:
for r in results[:]:
if r[1] < r_good[1] and r[2] > r_good[2]: # smaller retention and bigger workload
try:
results.remove(r)
except:
pass
print("lenght after removals " + str(len(results)))
return results
def print_couple(v1, v2):
print(f"{v1:0.3f},{v2:0.3f}")
def print_alert_incorrect_spots(list_of_factor, ndays):
"""select factors producing due dates around the cutoff in few steps only"""
for f, factor in enumerate(list_of_factor):
itv = settings.default_interval
new_itv = itv
nbsteps = 0
index_day = itv
while index_day < ndays:
nbsteps +=1
nbsteps = 1
if nbsteps > 8 or index_day > ndays * 105/100:
break
new_itv = int_or_round_floating_itv(new_itv * factor)
index_day += new_itv
#print(factor, itv, ndays)
if ndays*95/100 < index_day < ndays*105/100:
print("the simulation may be incorrect on the viscinity of this factor " + str(factor))
def generate_list_of_factors():
# trick to get this kind of factor 1.0, 1.1, 1.2, 1.25, 1.3, 1.3333333333333333
# Why ? These factors produce sensible enough different simulations because intervals are integers
# Intervals get rounded if they become float, so for the sake of the simulation, let's pick factors that
# would result in neat intervals
# How ? : take the results of fractions.
return [i / 120 for i in range(3 * 60, 10 * 200) if (i % 30 == 0) or (i % 40 == 0) or (i % 12 == 0)]
def build_sim(nsimsbyfactor, onfail, ndays, difficulty, factor):
pbt = fc.get_pb_success_from_interval_modifier(factor / 2.5, difficulty)
l = sim(pbt, nsims=nsimsbyfactor, onfail=onfail, ndays=ndays, difficulty=difficulty, factor=factor)
quotient = l[0] / l[1]
return SimulationResult(pbt, l[0], l[1], quotient, factor)
def analyse(input):
try:
with open(input, "rb") as picklefile:
total_output = pickle.load(picklefile)
except:
raise Exception("problem with input file. Either run a simulation with --run or give a file filepath")
print(" ")
print("factor value versus retention rate")
print("----------")
for output in total_output:
print_couple(output.factor, output.r_rate)
print(" ")
print("factor value versus efficiency")
print("----------")
for output in total_output:
print_couple(output.factor, output.efficiency)
print(" ")
print("factor value versus work")
print("----------")
for output in total_output:
print_couple(output.factor, output.w_load)
print("outputs : recall proba, real retention rate, workload, retention/workload")
best_productivity = max(total_output, key=itemgetter(3))
print("best_productivity")
print(best_productivity)
print("factor used")
print(best_productivity.factor)
least_work = min(total_output, key=itemgetter(2))
print("least work")
print(least_work)
print("factor used")
print(least_work[4])
for output in total_output:
if output[0] == settings.difficulty:
print("workload on default algorithm")
print(output)
break
if __name__ == "__main__":
my_parser = argparse.ArgumentParser(description="run or analyse simulation of spaced repetition algorithm")
my_parser.add_argument("--run", action="store_true")
my_parser.add_argument("--runopti", action="store_true")
my_parser.add_argument("--analyse", action="store_true")
my_parser.add_argument("--input", nargs="?", type=str)
my_parser.add_argument("--onfail", nargs="?", type=str, default="reset")
my_parser.add_argument("--output", nargs="?", type=str)
my_parser.add_argument("--outputdir", nargs="?", type=str)
my_parser.add_argument("--nsimsbyfactor", nargs="?", type=int, default=settings.nsimsbyfactor)
my_parser.add_argument("--verbose", action="store_true")
my_parser.add_argument("--ndays", nargs="?", type=int, default=settings.ndays)
my_parser.add_argument("--difficulty", nargs="?", type=float, default=settings.difficulty)
args = my_parser.parse_args()
nsimsbyfactor = args.nsimsbyfactor
onfail = args.onfail
ndays = args.ndays
verbose = args.verbose
if args.runopti:
if args.outputdir:
os.mkdir(args.outputdir)
else:
raise ValueError("you need to specify output directory (--outputdir)")
list_of_factors = generate_list_of_factors()
step = 1
end = 98
success_rate = 65
while success_rate <= end:
success_rate += step
difficulty = success_rate/100
total_output = []
for factor in list_of_factors:
total_output.append(build_sim(nsimsbyfactor, onfail, ndays, difficulty, factor))
if verbose:
best_productivity = max(total_output, key=itemgetter(3))
print(success_rate, *best_productivity)
with open(args.outputdir+"/"+str(success_rate)+".pkl", "wb") as picklefile:
pickle.dump(total_output, picklefile)
exit(0)
if args.run:
vals_to_print = []
total_output = []
list_of_factors = generate_list_of_factors()
#print_alert_incorrect_spots(list_of_factors, args.ndays)
focus_output = []
difficulty = args.difficulty
for factor in list_of_factors:
total_output.append(build_sim(nsimsbyfactor, onfail, ndays, difficulty, factor))
if args.output:
with open(args.output, "wb") as picklefile:
pickle.dump(total_output, picklefile)
if args.verbose:
analyse(args.output)
exit(0)
if args.analyse:
input = args.input
analyse(input)