-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathlocal-hypertable.c
450 lines (389 loc) · 14.8 KB
/
local-hypertable.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
#include <assert.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include "cilk-internal.h"
#include "debug.h"
#include "internal-malloc.h" /* only needed for new view allocation */
#include "local-hypertable.h"
static void reducer_base_init(reducer_base *rb) {
rb->view = NULL;
rb->reduce_fn = NULL;
}
static void make_tombstone(uintptr_t *key) { *key = KEY_DELETED; }
static void bucket_init(struct bucket *b) {
b->key = KEY_EMPTY;
reducer_base_init(&b->value);
}
// Constant used to determine the target maximum load factor. The
// table will aim for a maximum load factor of
// 1 - (1 / LOAD_FACTOR_CONSTANT).
static const int32_t LOAD_FACTOR_CONSTANT = 16;
// Prevent integer overflow computing load factor
__attribute__((unused))
static const int32_t MAX_CAPACITY = 0x7fffffff / (LOAD_FACTOR_CONSTANT - 1);
static bool is_overloaded(int32_t occupancy, int32_t capacity) {
// Set the upper load threshold to be 15/16ths of the capacity.
return occupancy >
(LOAD_FACTOR_CONSTANT - 1) * capacity / LOAD_FACTOR_CONSTANT;
}
static bool is_underloaded(int32_t occupancy, int32_t capacity) {
// Set the lower load threshold to be 7/16ths of the capacity.
return (capacity > MIN_CAPACITY) &&
(occupancy <=
((LOAD_FACTOR_CONSTANT / 2) - 1) * capacity / LOAD_FACTOR_CONSTANT);
}
// After enough insertions and deletions have occurred, rebuild the
// table to fix up tombstones.
static const int32_t MIN_REBUILD_OP_COUNT = 8;
static bool time_to_rebuild(int32_t ins_rm_count, int32_t capacity) {
return (ins_rm_count > MIN_REBUILD_OP_COUNT) &&
(ins_rm_count > capacity / (4 * LOAD_FACTOR_CONSTANT));
}
static struct bucket *bucket_array_create(int32_t array_size) {
struct bucket *buckets =
(struct bucket *)calloc(array_size, sizeof(struct bucket));
if (array_size < MIN_HT_CAPACITY) {
for (int32_t i = 0; i < array_size; ++i) {
bucket_init(&buckets[i]);
}
return buckets;
}
int32_t tombstone_idx = 0;
for (int32_t i = 0; i < array_size; ++i) {
bucket_init(&buckets[i]);
// Graveyard hashing: Insert tombstones at regular intervals.
// TODO: Check if it's bad for the insertions to rebuild a
// table to use these tombstones.
if (tombstone_idx == 2 * LOAD_FACTOR_CONSTANT) {
make_tombstone(&buckets[i].key);
tombstone_idx -= 2 * LOAD_FACTOR_CONSTANT;
} else
++tombstone_idx;
}
return buckets;
}
hyper_table *__cilkrts_local_hyper_table_alloc(void) {
hyper_table *table = malloc(sizeof(hyper_table));
int32_t capacity = MIN_CAPACITY;
table->capacity = capacity;
table->occupancy = 0;
table->ins_rm_count = 0;
table->buckets = bucket_array_create(capacity);
return table;
}
void local_hyper_table_free(hyper_table *table) {
free(table->buckets);
free(table);
}
static struct bucket *rebuild_table(hyper_table *table, int32_t new_capacity) {
struct bucket *old_buckets = table->buckets;
int32_t old_capacity = table->capacity;
int32_t old_occupancy = table->occupancy;
assert(new_capacity <= MAX_CAPACITY);
table->buckets = bucket_array_create(new_capacity);
table->capacity = new_capacity;
table->occupancy = 0;
// Set count of insertions and removals to prevent insertions into
// new table from triggering another rebuild.
table->ins_rm_count = -old_occupancy;
// Iterate through old table and insert each element into the new
// table.
for (int32_t i = 0; i < old_capacity; ++i) {
if (is_valid(old_buckets[i].key)) {
bool success = insert_hyperobject(table, old_buckets[i]);
assert(success && "Failed to insert when resizing table.");
(void)success;
}
}
assert(table->occupancy == old_occupancy &&
"Mismatched occupancy after resizing table.");
free(old_buckets);
return table->buckets;
}
///////////////////////////////////////////////////////////////////////////
// Query, insert, and delete methods for the hash table.
struct bucket *__cilkrts_find_hyperobject_hash(hyper_table *table,
uintptr_t key) {
int32_t capacity = table->capacity;
// Target hash
const index_t tgt = get_table_entry(capacity, key);
struct bucket *buckets = table->buckets;
// Start the probe at the target hash
index_t i = tgt;
do {
uintptr_t curr_key = buckets[i].key;
// Found the key? Return that bucket.
// TODO: Consider moving this bucket to the front of the run.
if (key == curr_key)
return &buckets[i];
// Found an empty entry? The probe failed.
if (is_empty(curr_key))
return NULL;
// Found a tombstone? Continue the probe.
if (is_tombstone(curr_key)) {
i = inc_index(i, capacity);
continue;
}
// Otherwise, buckets[i] is another valid key that does not match.
index_t curr_hash = buckets[i].hash;
if (continue_probe(tgt, curr_hash, i)) {
i = inc_index(i, capacity);
continue;
}
// If none of the above cases match, then the probe failed to
// find the key.
return NULL;
} while (i != tgt);
// The probe failed to find the key.
return NULL;
}
bool remove_hyperobject(hyper_table *table, uintptr_t key) {
if (table->capacity < MIN_HT_CAPACITY) {
// If the table is small enough, just scan the array.
struct bucket *buckets = table->buckets;
int32_t occupancy = table->occupancy;
for (int32_t i = 0; i < occupancy; ++i) {
if (buckets[i].key == key) {
if (i == occupancy - 1)
// Set this entry's key to empty. This code is here
// primarily to handle the case where occupancy == 1.
buckets[i].key = KEY_EMPTY;
else
// Remove this entry by swapping it with the last entry.
buckets[i] = buckets[occupancy - 1];
// Decrement the occupancy.
--table->occupancy;
return true;
}
}
return false;
}
// Find the key in the table.
struct bucket *entry = find_hyperobject(table, key);
// If entry is NULL, the probe did not find the key.
if (NULL == entry)
return false;
// The probe found the key and returned a pointer to the entry.
// Replace the entry with a tombstone and decrement the occupancy.
make_tombstone(&entry->key);
--table->occupancy;
++table->ins_rm_count;
int32_t capacity = table->capacity;
if (is_underloaded(table->occupancy, capacity))
rebuild_table(table, capacity / 2);
else if (time_to_rebuild(table->ins_rm_count, capacity))
rebuild_table(table, capacity);
return true;
}
bool insert_hyperobject(hyper_table *table, struct bucket b) {
assert(b.key != KEY_EMPTY && b.key != KEY_DELETED);
int32_t capacity = table->capacity;
struct bucket *buckets = table->buckets;
if (capacity < MIN_HT_CAPACITY) {
// If the table is small enough, just scan the array.
int32_t occupancy = table->occupancy;
if (occupancy < capacity) {
for (int32_t i = 0; i < occupancy; ++i) {
if (buckets[i].key == b.key) {
// The key is already in the table. Overwrite.
buckets[i] = b;
return true;
}
}
// The key is not aleady in the table. Append the bucket.
buckets[occupancy] = b;
++table->occupancy;
return true;
}
// The small table is full. Increase its capacity, convert it
// to a hash table, and fall through to insert the new bucket
// into that hash table.
capacity *= 2;
buckets = rebuild_table(table, capacity);
}
// If the occupancy is already too high, rebuild the table.
if (is_overloaded(table->occupancy, capacity)) {
capacity *= 2;
buckets = rebuild_table(table, capacity);
} else if (time_to_rebuild(table->ins_rm_count, capacity)) {
buckets = rebuild_table(table, capacity);
}
// Target hash
const index_t tgt = get_table_entry(capacity, b.key);
b.hash = tgt;
// If we find an empty entry, insert the bucket there.
if (is_empty(buckets[tgt].key)) {
buckets[tgt] = b;
++table->occupancy;
++table->ins_rm_count;
return true;
}
// Probe for the place to insert b.
index_t i = tgt;
const index_t probe_end = tgt;
do {
uintptr_t curr_key = buckets[i].key;
// Found the key? Overwrite that bucket.
// TODO: Reconsider what to do in this case.
if (b.key == curr_key) {
buckets[i].value = b.value;
return true;
}
// Found an empty entry? Insert b there.
if (is_empty(curr_key)) {
buckets[i] = b;
++table->occupancy;
++table->ins_rm_count;
return true;
}
// Found a tombstone?
if (is_tombstone(curr_key)) {
index_t current_tomb = i;
// Scan consecutive tombstones from i.
index_t next_i = inc_index(i, capacity);
uintptr_t tomb_end = buckets[next_i].key;
while (next_i != probe_end && is_tombstone(tomb_end)) {
next_i = inc_index(next_i, capacity);
tomb_end = buckets[next_i].key;
}
// If the next entry is empty, then the probe would stop. It's
// safe to insert the bucket at the tombstone at i.
if (is_empty(tomb_end)) {
buckets[current_tomb] = b;
++table->occupancy;
++table->ins_rm_count;
return true;
}
// Check if the hash at the end of this run of tombstones would
// terminate the probe or if the probe has traversed the whole
// table.
index_t tomb_end_hash = buckets[next_i].hash;
if (next_i == probe_end ||
!continue_probe(tgt, tomb_end_hash, next_i)) {
// It's safe to insert b at the current tombstone.
buckets[current_tomb] = b;
++table->occupancy;
++table->ins_rm_count;
return true;
}
// None of the locations among these consecutive tombstones are
// appropriate for this bucket. Continue the probe.
i = next_i;
continue;
}
// Otherwise this entry contains another valid key that does
// not match. Compare the hashes to decide whether or not to
// continue the probe.
index_t curr_hash = buckets[i].hash;
if (continue_probe(tgt, curr_hash, i)) {
i = inc_index(i, capacity);
continue;
}
// This is an appropriate location to insert the bucket. Stop
// the probe.
break;
} while (i != probe_end);
index_t insert_tgt = i;
// The probe found a place to insert the bucket, but it's occupied. Insert
// the bucket here and shift the subsequent entries.
do {
// If this entry is empty or a tombstone, insert the current bucket at
// this location and terminate.
if (!is_valid(buckets[i].key)) {
buckets[i] = b;
++table->occupancy;
++table->ins_rm_count;
return true;
}
// Swap b with the current bucket.
struct bucket tmp = buckets[i];
buckets[i] = b;
b = tmp;
// Continue onto the next index.
i = inc_index(i, capacity);
} while (i != insert_tgt);
assert(i != insert_tgt && "Insertion failed.");
return false;
}
void *__cilkrts_insert_new_view(hyper_table *table, uintptr_t key, size_t size,
__cilk_identity_fn identity,
__cilk_reduce_fn reduce) {
// Create a new view and initialize it with the identity function.
void *new_view = cilk_aligned_alloc(64, round_size_to_alignment(64, size));
identity(new_view);
// Insert the new view into the local hypertable.
struct bucket new_bucket = {
.key = (uintptr_t)key,
.value = {.view = new_view, .reduce_fn = reduce}};
bool success = insert_hyperobject(table, new_bucket);
assert(success);
(void)success;
// Return the new view.
return new_view;
}
// Merge two hypertables, left and right. Returns the merged hypertable and
// deletes the other.
hyper_table *merge_two_hts(hyper_table *restrict left,
hyper_table *restrict right) {
// In the trivial case of an empty hyper_table, return the other
// hyper_table.
if (!left)
return right;
if (!right)
return left;
if (left->occupancy == 0) {
local_hyper_table_free(left);
return right;
}
if (right->occupancy == 0) {
local_hyper_table_free(right);
return left;
}
// Pick the smaller hyper_table to be the source, which we will iterate
// over.
bool left_dst;
hyper_table *src, *dst;
if (left->occupancy >= right->occupancy) {
src = right;
dst = left;
left_dst = true;
} else {
src = left;
dst = right;
left_dst = false;
}
int32_t src_capacity =
(src->capacity < MIN_HT_CAPACITY) ? src->occupancy : src->capacity;
struct bucket *src_buckets = src->buckets;
// Iterate over the contents of the source hyper_table.
for (int32_t i = 0; i < src_capacity; ++i) {
struct bucket b = src_buckets[i];
if (!is_valid(b.key))
continue;
// For each valid key in the source table, lookup that key in the
// destination table.
struct bucket *dst_bucket = find_hyperobject(dst, b.key);
if (NULL == dst_bucket) {
// The destination table does not contain this key. Insert the
// key-value pair from the source table into the destination.
insert_hyperobject(dst, b);
} else {
// Merge the two views in the source and destination buckets, being
// sure to preserve left-to-right ordering. Free the right view
// when done.
reducer_base dst_rb = dst_bucket->value;
if (left_dst) {
dst_rb.reduce_fn(dst_rb.view, b.value.view);
free(b.value.view);
} else {
dst_rb.reduce_fn(b.value.view, dst_rb.view);
free(dst_rb.view);
dst_bucket->value.view = b.value.view;
}
}
}
// Destroy the source hyper_table, and return the destination.
local_hyper_table_free(src);
return dst;
}