Skip to content

Commit

Permalink
perf: Add hash-join for semijoin and antijoin. Refactor join code.
Browse files Browse the repository at this point in the history
  • Loading branch information
jheer committed Apr 2, 2021
1 parent 0d138ae commit 68ee5b4
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 101 deletions.
71 changes: 65 additions & 6 deletions src/engine/join-filter.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,33 @@
import { singleRowLookup } from './join/lookup';
import BitSet from '../table/bit-set';
import isArray from '../util/is-array';

export default function(tableL, tableR, predicate, options = {}) {
// calculate semi-join filter mask
const filter = new BitSet(tableL.totalRows());
const join = isArray(predicate) ? hashSemiJoin : loopSemiJoin;
join(filter, tableL, tableR, predicate);

// if anti-join, negate the filter
if (options.anti) {
filter.not().and(tableL.mask());
}

return tableL.create({ filter });
}

function hashSemiJoin(filter, tableL, tableR, [keyL, keyR]) {
// build lookup table
const lut = singleRowLookup(tableR, keyR);

// scan table, update filter with matches
tableL.scan((rowL, data) => {
const rowR = lut.get(keyL(rowL, data));
if (rowR >= 0) filter.set(rowL);
});
}

function loopSemiJoin(filter, tableL, tableR, predicate) {
const nL = tableL.numRows();
const nR = tableR.numRows();
const dataL = tableL.data();
Expand Down Expand Up @@ -31,11 +57,44 @@ export default function(tableL, tableR, predicate, options = {}) {
}
}
}
}

// if anti-join, negate the filter
if (options.anti) {
filter.not().and(tableL.mask());
}
// export default function(tableL, tableR, predicate, options = {}) {
// const filter = new BitSet(tableL.totalRows());
// const nL = tableL.numRows();
// const nR = tableR.numRows();
// const dataL = tableL.data();
// const dataR = tableR.data();

return tableL.create({ filter });
}
// if (tableL.isFiltered() || tableR.isFiltered()) {
// // use indices as at least one table is filtered
// const idxL = tableL.indices(false);
// const idxR = tableR.indices(false);
// for (let i = 0; i < nL; ++i) {
// const rowL = idxL[i];
// for (let j = 0; j < nR; ++j) {
// if (predicate(rowL, dataL, idxR[j], dataR)) {
// filter.set(rowL);
// break;
// }
// }
// }
// } else {
// // no filters, enumerate row indices directly
// for (let i = 0; i < nL; ++i) {
// for (let j = 0; j < nR; ++j) {
// if (predicate(i, dataL, j, dataR)) {
// filter.set(i);
// break;
// }
// }
// }
// }

// // if anti-join, negate the filter
// if (options.anti) {
// filter.not().and(tableL.mask());
// }

// return tableL.create({ filter });
// }
25 changes: 10 additions & 15 deletions src/engine/join.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { multiRowLookup } from './join/lookup';
import columnSet from '../table/column-set';
import concat from '../util/concat';
import isArray from '../util/is-array';
Expand Down Expand Up @@ -76,30 +77,24 @@ function loopJoin(emit, predicate, dataL, dataR, idxL, idxR, hitL, hitR, nL, nR)

function hashJoin(emit, [keyL, keyR], dataL, dataR, idxL, idxR, hitL, hitR, nL, nR) {
// determine which table to hash
let dataScan, keyScan, hitScan, idxScan, nScan;
let dataHash, keyHash, hitHash, idxHash, nHash;
let dataScan, keyScan, hitScan, idxScan;
let dataHash, keyHash, hitHash, idxHash;
let emitScan = emit;
if (nL >= nR) {
dataScan = dataL; keyScan = keyL; hitScan = hitL; idxScan = idxL; nScan = nL;
dataHash = dataR; keyHash = keyR; hitHash = hitR; idxHash = idxR; nHash = nR;
dataScan = dataL; keyScan = keyL; hitScan = hitL; idxScan = idxL;
dataHash = dataR; keyHash = keyR; hitHash = hitR; idxHash = idxR;
} else {
dataScan = dataR; keyScan = keyR; hitScan = hitR; idxScan = idxR; nScan = nR;
dataHash = dataL; keyHash = keyL; hitHash = hitL; idxHash = idxL; nHash = nL;
dataScan = dataR; keyScan = keyR; hitScan = hitR; idxScan = idxR;
dataHash = dataL; keyHash = keyL; hitHash = hitL; idxHash = idxL;
emitScan = (i, a, j, b) => emit(j, b, i, a);
}

// build lookup table
const lut = new Map();
for (let i = 0; i < nHash; ++i) {
const key = keyHash(idxHash[i], dataHash);
if (key != null && key === key) {
if (!lut.has(key)) lut.set(key, []);
lut.get(key).push(i);
}
}
const lut = multiRowLookup(idxHash, dataHash, keyHash);

// scan other table
for (let j = 0; j < nScan; ++j) {
const m = idxScan.length;
for (let j = 0; j < m; ++j) {
const rowScan = idxScan[j];
const list = lut.get(keyScan(rowScan, dataScan));
if (list) {
Expand Down
25 changes: 25 additions & 0 deletions src/engine/join/lookup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export function singleRowLookup(table, hash) {
const lut = new Map();
table.scan((row, data) => {
const key = hash(row, data);
if (key != null && key === key) {
lut.set(key, row);
}
});
return lut;
}

export function multiRowLookup(idx, data, hash) {
const lut = new Map();
const n = idx.length;
for (let i = 0; i < n; ++i) {
const row = idx[i];
const key = hash(row, data);
if (key != null && key === key) {
lut.has(key)
? lut.get(key).push(row)
: lut.set(key, [row]);
}
}
return lut;
}
45 changes: 16 additions & 29 deletions src/engine/lookup.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,33 @@
import { singleRowLookup } from './join/lookup';
import { aggregateGet } from './reduce/util';
import columnSet from '../table/column-set';
import NULL from '../util/null';
import concat from '../util/concat';
import unroll from '../util/unroll';

export default function(tableL, tableR, [keyL, keyR], { names, exprs, ops }) {
// instantiate output data
const cols = columnSet(tableL);
const total = tableL.totalRows();
names.forEach(name => cols.add(name, Array(total)));
names.forEach(name => cols.add(name, Array(total).fill(NULL)));

// build lookup table
const lut = new Map();
tableR.scan((row, data) => {
const key = keyR(row, data);
if (key != null && key === key) {
lut.set(keyR(row, data), row);
}
});
const lut = singleRowLookup(tableR, keyR);

// generate setter function for lookup match
const set = unroll(
['lr', 'rr', 'data'],
'{' + concat(names, (_, i) => `_[${i}][lr] = $[${i}](rr, data);`) + '}',
names.map(name => cols.data[name]),
aggregateGet(tableR, ops, exprs)
);

// find matching rows
const rowL = new Int32Array(tableL.numRows());
const rowR = new Int32Array(tableL.numRows());
let m = 0;
// find matching rows, set values on match
const dataR = tableR.data();
tableL.scan((lrow, data) => {
const rrow = lut.get(keyL(lrow, data));
rowL[m] = lrow;
rowR[m] = rrow == null ? -1 : rrow;
++m;
if (rrow >= 0) set(lrow, rrow, dataR);
});

// output values for matching rows
const dataR = tableR.data();
const get = aggregateGet(tableR, ops, exprs);
const n = get.length;

for (let i = 0; i < n; ++i) {
const column = cols.data[names[i]];
const getter = get[i];
for (let j = 0; j < m; ++j) {
const rrow = rowR[j];
column[rowL[j]] = rrow >= 0 ? getter(rrow, dataR) : NULL;
}
}

return tableL.create(cols);
}
22 changes: 6 additions & 16 deletions src/verbs/join-filter.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
import _join_filter from '../engine/join-filter';
import { inferKeys, keyPredicate } from './util/join-keys';
import parse from '../expression/parse';
import { inferKeys } from './join';
import parseKey from './util/parse-key';
import isArray from '../util/is-array';
import toArray from '../util/to-array';

export default function(tableL, tableR, on, options) {
on = inferKeys(tableL, tableR, on);
on = isArray(on)
? toPredicate(
parseKey('join', tableL, on[0]),
parseKey('join', tableR, on[1])
)
: parse({ on }, { join: [tableL, tableR] }).exprs[0];

return _join_filter(tableL, tableR, on, options);
}
const predicate = isArray(on)
? keyPredicate(tableL, tableR, ...on.map(toArray))
: parse({ on }, { join: [tableL, tableR] }).exprs[0];

function toPredicate(keyL, keyR) {
return (rowL, dataL, rowR, dataR) => {
const kl = keyL(rowL, dataL);
const kr = keyR(rowR, dataR);
return kl === kr && kl != null && kr != null;
};
return _join_filter(tableL, tableR, predicate, options);
}
28 changes: 2 additions & 26 deletions src/verbs/join.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import _join from '../engine/join';
import parseKey from './util/parse-key';
import { inferKeys, keyPredicate } from './util/join-keys';
import parseValue from './util/parse';
import parse from '../expression/parse';
import { all, not } from '../helpers/selection';
import error from '../util/error';
import intersect from '../util/intersect';
import isArray from '../util/is-array';
import isString from '../util/is-string';
import toArray from '../util/to-array';
Expand All @@ -20,14 +18,7 @@ export default function(tableL, tableR, on, values, options = {}) {

if (isArray(on)) {
const [onL, onR] = on.map(toArray);
if (onL.length !== onR.length) {
error('Mismatched number of join keys');
}

predicate = [
parseKey('join', tableL, onL),
parseKey('join', tableR, onR)
];
predicate = keyPredicate(tableL, tableR, onL, onR);

if (!values) {
// infer output columns, suppress duplicated key columns
Expand All @@ -49,21 +40,6 @@ export default function(tableL, tableR, on, values, options = {}) {
);
}

export function inferKeys(tableL, tableR, on) {
if (!on) {
// perform natural join if join condition not provided
const isect = intersect(tableL.columnNames(), tableR.columnNames());
if (!isect.length) error('Natural join requires shared column names.');
on = [isect, isect];
} else if (isString(on)) {
on = [on, on];
} else if (isArray(on) && on.length === 1) {
on = [on[0], on[0]];
}

return on;
}

function inferValues(tableL, onL, onR, options) {
const isect = [];
onL.forEach((s, i) => isString(s) && s === onR[i] ? isect.push(s) : 0);
Expand Down
2 changes: 1 addition & 1 deletion src/verbs/lookup.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _lookup from '../engine/lookup';
import { inferKeys } from './join';
import { inferKeys } from './util/join-keys';
import parseKey from './util/parse-key';
import parseValues from './util/parse';

Expand Down
30 changes: 30 additions & 0 deletions src/verbs/util/join-keys.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import parseKey from './parse-key';
import error from '../../util/error';
import intersect from '../../util/intersect';
import isArray from '../../util/is-array';
import isString from '../../util/is-string';

export function inferKeys(tableL, tableR, on) {
if (!on) {
// perform natural join if join condition not provided
const isect = intersect(tableL.columnNames(), tableR.columnNames());
if (!isect.length) error('Natural join requires shared column names.');
on = [isect, isect];
} else if (isString(on)) {
on = [on, on];
} else if (isArray(on) && on.length === 1) {
on = [on[0], on[0]];
}

return on;
}

export function keyPredicate(tableL, tableR, onL, onR) {
if (onL.length !== onR.length) {
error('Mismatched number of join keys');
}
return [
parseKey('join', tableL, onL),
parseKey('join', tableR, onR)
];
}
Loading

0 comments on commit 68ee5b4

Please sign in to comment.