-
Notifications
You must be signed in to change notification settings - Fork 12
/
job_shop_scheduler.py
345 lines (293 loc) · 14.9 KB
/
job_shop_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
from __future__ import print_function
from bisect import bisect_right
from os import PathLike
import dwavebinarycsp
def get_jss_bqm(job_dict, max_time, disable_till=None, disable_since=None, disabled_variables=None, stitch_kwargs=None):
"""Returns a BQM to the Job Shop Scheduling problem.
Args:
job_dict: A dict. Contains the jobs we're interested in scheduling. (See Example below.)
max_time: An integer. The upper bound on the amount of time the schedule can take.
stitch_kwargs: A dict. Kwargs to be passed through get_jss_bqm to dwavebinarycsp.stitch.
Returns:
A dimod.BinaryQuadraticModel. Note: The nodes in the BQM are labelled in the format,
<job_name>_<task_number>,<time>. (See Example below)
Example:
'jobs' dict describes the jobs we're interested in scheduling. Namely, the dict key is the
name of the job and the dict value is the ordered list of tasks that the job must do.
It follows the format:
{"job_name": [(machine_name, time_duration_on_machine), ..],
"another_job_name": [(some_machine, time_duration_on_machine), ..]}
>>> # Create BQM
>>> jobs = {"a": [("mixer", 2), ("oven", 1)],
"b": [("mixer", 1)],
"c": [("oven", 2)]}
>>> max_time = 4 # Put an upperbound on how long the schedule can be
>>> bqm = get_jss_bqm(jobs, max_time, stitch_kwargs)
>>> # May need to tweak the chain strength and the number of reads
>>> sampler = EmbeddingComposite(DWaveSampler())
>>> sampleset = sampler.sample(bqm, chain_strength=2, num_reads=1000)
>>> # Results
>>> # Note: Each node follows the format <job_name>_<task_number>,<time>.
>>> print(sampleset)
c_0,0 b_0,1 c_0,1 b_0,3 c_0,2 b_0,0 b_0,2 a_1,2 a_1,3 a_1,1 a_0,0 a_1,0 a_0,1 a_0,2
1 0 0 0 0 0 1 1 0 0 1 0 0 0
Interpreting Results:
Consider the node, "b_0,2" with a value of 1.
- "b_0,2" is interpreted as job b, task 0, at time 2
- Job b's 0th task is ("mixer", 1)
- Hence, at time 2, Job b's 0th task is turned on
Consider the node, "a_1,0" with a value of 0.
- "a_1,0" is interpreted as job a, task 1, at time 0
- Job a's 1st task is ("oven", 1)
- Hence, at time 0, Job a's 1st task is not run
"""
if stitch_kwargs is None:
stitch_kwargs = {}
if disable_till is None:
disable_till = {}
if disable_since is None:
disable_since = {}
if disabled_variables is None:
disabled_variables = []
scheduler = JobShopScheduler(job_dict, max_time)
return scheduler.get_bqm(disable_till, disable_since, disabled_variables, stitch_kwargs)
def sum_to_one(*args):
return sum(args) == 1
def get_label(task, time):
"""Creates a standardized name for variables in the constraint satisfaction problem,
JobShopScheduler.csp.
"""
return f"{task.job}_{task.position},{time}"
class Task:
def __init__(self, job, position, machine, duration):
self.job = job
self.position = position
self.machine = machine
self.duration = duration
def __repr__(self):
return ("{{job: {job}, position: {position}, machine: {machine}, duration:"
" {duration}}}").format(**vars(self))
class KeyList:
"""A wrapper to an array. Used for passing the key of a custom object to the bisect function.
Note: bisect function does not let you choose an arbitrary key, hence this class was created.
"""
def __init__(self, array, key_function):
self.array = array # An iterable
self.key_function = key_function # Function for grabbing the key of a given item
def __len__(self):
return len(self.array)
def __getitem__(self, index):
item = self.array[index]
key = self.key_function(item)
return key
class JobShopScheduler:
def __init__(self, job_dict, max_time=None):
"""
Args:
job_dict: A dictionary. It describes the jobs that need to be scheduled. Namely, the
dict key is the name of the job and the dict value is the ordered list of tasks that
the job must do. (See Job Dict Details below.)
max_time: An integer. The upper bound on the amount of time the schedule can take.
Job Dict Details:
The job_dict has the following format:
{"job_name": [(machine_name, integer_time_duration_on_machine), ..],
..
"another_job_name": [(some_machine, integer_time_duration_on_machine), ..]}
A small job_dict example:
jobs = {"job_a": [("mach_1", 2), ("mach_2", 2), ("mach_3", 2)],
"job_b": [("mach_3", 3), ("mach_2", 1), ("mach_1", 1)],
"job_c": [("mach_2", 2), ("mach_1", 3), ("mach_2", 1)]}
"""
self.tasks = []
self.last_task_indices = []
self.max_time = max_time
self.csp = dwavebinarycsp.ConstraintSatisfactionProblem(
dwavebinarycsp.BINARY)
# Populates self.tasks and self.max_time
self._process_data(job_dict)
def _process_data(self, jobs):
"""Process user input into a format that is more convenient for JobShopScheduler functions.
"""
# Create and concatenate Task objects
tasks = []
last_task_indices = [-1] # -1 for zero-indexing
total_time = 0 # total time of all jobs
for job_name, job_tasks in jobs.items():
last_task_indices.append(last_task_indices[-1] + len(job_tasks))
for i, (machine, time_span) in enumerate(job_tasks):
tasks.append(Task(job_name, i, machine, time_span))
total_time += time_span
# Update values
self.tasks = tasks
self.last_task_indices = last_task_indices[1:]
if self.max_time is None:
self.max_time = total_time
def _add_one_start_constraint(self):
"""self.csp gets the constraint: A task can start once and only once
"""
for task in self.tasks:
task_times = {get_label(task, t) for t in range(self.max_time)}
self.csp.add_constraint(sum_to_one, task_times)
def _add_precedence_constraint(self):
"""self.csp gets the constraint: Task must follow a particular order.
Note: assumes self.tasks are sorted by jobs and then by position
"""
valid_edges = {(0, 0), (1, 0), (0, 1)}
for current_task, next_task in zip(self.tasks, self.tasks[1:]):
if current_task.job != next_task.job:
continue
# Forming constraints with the relevant times of the next task
for t in range(self.max_time):
current_label = get_label(current_task, t)
for tt in range(min(t + current_task.duration, self.max_time)):
next_label = get_label(next_task, tt)
self.csp.add_constraint(
valid_edges, {current_label, next_label})
def _add_share_machine_constraint(self):
"""self.csp gets the constraint: At most one task per machine per time unit
"""
sorted_tasks = sorted(self.tasks, key=lambda x: x.machine)
# Key wrapper for bisect function
wrapped_tasks = KeyList(sorted_tasks, lambda x: x.machine)
head = 0
valid_values = {(0, 0), (1, 0), (0, 1)}
while head < len(sorted_tasks):
# Find tasks that share a machine
tail = bisect_right(wrapped_tasks, sorted_tasks[head].machine)
same_machine_tasks = sorted_tasks[head:tail]
# Update
head = tail
# No need to build coupling for a single task
if len(same_machine_tasks) < 2:
continue
# Apply constraint between all tasks for each unit of time
for task in same_machine_tasks:
for other_task in same_machine_tasks:
if task.job == other_task.job and task.position == other_task.position:
continue
for t in range(self.max_time):
current_label = get_label(task, t)
for tt in range(t, min(t + task.duration, self.max_time)):
self.csp.add_constraint(valid_values, {current_label,
get_label(other_task, tt)})
def _remove_absurd_times(self, disable_till: dict, disable_since, disabled_variables):
"""Sets impossible task times in self.csp to 0.
Args:
disabled_times (dict):
s - start of disabled region (included)
e - end of disabled region (excluded)
{"machine_1": [(s1, e1), (s2, e2), (s3, e3)],
"machine_2": [(s1, e1), (s2, e2)],
"machine_3": [(s1, e1), (s2, e2)]}
"""
# Times that are too early for task
predecessor_time = 0
current_job = self.tasks[0].job
for task in self.tasks:
# Check if task is in current_job
if task.job != current_job:
predecessor_time = 0
current_job = task.job
for t in range(predecessor_time):
label = get_label(task, t)
self.csp.fix_variable(label, 0)
predecessor_time += task.duration
# Times that are too late for task
# Note: we are going through the task list backwards in order to compute
# the successor time
# start with -1 so that we get (total task time - 1)
successor_time = -1
current_job = self.tasks[-1].job
for task in self.tasks[::-1]:
# Check if task is in current_job
if task.job != current_job:
successor_time = -1
current_job = task.job
successor_time += task.duration
for t in range(successor_time):
# -1 for zero-indexed time
label = get_label(task, (self.max_time - 1) - t)
self.csp.fix_variable(label, 0)
# Times that are interfering with disabled regions
# disabled variables, disable_till and disable_since
# are explained in instance_parser.py
for task in self.tasks:
if task.machine in disable_till.keys():
for i in range(disable_till[task.machine]):
label = get_label(task, i)
if label in self.csp.variables:
self.csp.fix_variable(label, 0)
elif task.machine in disable_since.keys():
for i in range(disable_since[task.machine], self.max_time):
label = get_label(task, i)
if label in self.csp.variables:
self.csp.fix_variable(label, 0)
# Times that are manually disabled
for label in disabled_variables:
if label in self.csp.variables:
self.csp.fix_variable(label, 0)
def get_bqm(self, disable_till, disable_since, disabled_variables, stitch_kwargs=None):
"""Returns a BQM to the Job Shop Scheduling problem.
Args:
stitch_kwargs: A dict. Kwargs to be passed to dwavebinarycsp.stitch.
"""
if stitch_kwargs is None:
stitch_kwargs = {}
# Apply constraints to self.csp
self._add_one_start_constraint()
self._add_precedence_constraint()
self._add_share_machine_constraint()
self._remove_absurd_times(disable_till, disable_since, disabled_variables)
bqm = dwavebinarycsp.stitch(self.csp, **stitch_kwargs)
# Edit BQM to encourage the shortest schedule
# Overview of this added penalty:
# - Want any-optimal-schedule-penalty < any-non-optimal-schedule-penalty
# - Suppose there are N tasks that need to be scheduled and N > 0
# - Suppose the the optimal end time for this schedule is t_N
# - Then the worst optimal schedule would be if ALL the tasks ended at time t_N. (Since
# the optimal schedule is only dependent on when the LAST task is run, it is irrelevant
# when the first N-1 tasks end.) Note that by "worst" optimal schedule, I am merely
# referring to the most heavily penalized optimal schedule.
#
# Show math satisfies any-optimal-schedule-penalty < any-non-optimal-schedule-penalty:
# - Penalty scheme. Each task is given the penalty: base^(task-end-time). The penalty
# of the entire schedule is the sum of penalties of these chosen tasks.
# - Chose the base of my geometric series to be N+1. This simplifies the math and it will
# become apparent why it's handy later on.
#
# - Comparing the SUM of penalties between any optimal schedule (on left) with that of the
# WORST optimal schedule (on right). As shown below, in this penalty scheme, any optimal
# schedule penalty <= the worst optimal schedule.
# sum_i (N+1)^t_i <= N * (N+1)^t_N, where t_i the time when the task i ends [eq 1]
#
# - Now let's show that all optimal schedule penalties < any non-optimal schedule penalty.
# We can prove this by applying eq 1 and simply proving that the worst optimal schedule
# penalty (below, on left) is always less than any non-optimal schedule penalty.
# N * (N+1)^t_N < (N+1)^(t_N + 1)
# Note: t_N + 1 is the smallest end time for a non-optimal
# schedule. Hence, if t_N' is the end time of the last
# task of a non-optimal schedule, t_N + 1 <= t_N'
# <= (N+1)^t_N'
# < sum^(N-1) (N+1)^t_i' + (N+1)^t_N'
# = sum^N (N+1)^t_i'
# Note: sum^N (N+1)^t' is the sum of penalties for a
# non-optimal schedule
#
# - Therefore, with this penalty scheme, all optimal solution penalties < any non-optimal
# solution penalties
base = len(self.last_task_indices) + 1 # Base for exponent
# Get our pruned (remove_absurd_times) variable list so we don't undo pruning
pruned_variables = list(bqm.variables)
for i in self.last_task_indices:
task = self.tasks[i]
for t in range(self.max_time):
end_time = t + task.duration
# Check task's end time; do not add in absurd times
if end_time > self.max_time:
continue
# Add bias to variable
bias = 2 * base**(end_time - self.max_time)
label = get_label(task, t)
if label in pruned_variables:
bqm.add_variable(label, bias)
return bqm