Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/more efficient limit #246

Open
wants to merge 4 commits into
base: feature/hydrate-queries
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,5 +325,9 @@ All notable changes to this project will be documented in this file. Breaking ch
### Added
- Adds new query execution option `hydrate`. When a GSI uses a `KEYS_ONLY` projection, the `hydrate` option will perform the query and then a subsequent `batchGet` to "hydrate" the results.

### Changed
- The execution option `limit` is now better optimized for cases where filters might result in index "misses". The `limit` option used to use the `Limit` parameter, but this could result in requiring more requests when user applied filters caused no items from being returned in a single request.

### Fixed
- A common issue amongst new users, was unexpected errors when using a terminal method twice on a query chain. This would often come up when a user called `.params()` to log out parameters and then call `.go()` on the same chain. The fix was to prevent duplicative side effects from occurring on each subsequent terminal method call.
- A common issue amongst new users, was unexpected errors when using a terminal method twice on a query chain. This would often come up when a user called `.params()` to log out parameters and then call `.go()` on the same chain. The fix was to prevent duplicative side effects from occurring on each subsequent terminal method call. Addresses gh issue #239.

167 changes: 90 additions & 77 deletions src/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,7 @@ class Entity {
let response = await this._exec(MethodTypes.batchWrite, params, config);
if (validations.isFunction(config.parse)) {
let parsed = config.parse(config, response);
if (parsed) {
results.push(parsed);
}
results.push(parsed.data);
} else {
let {unprocessed} = this.formatBulkWriteResponse(response, config);
for (let u of unprocessed) {
Expand Down Expand Up @@ -432,7 +430,8 @@ class Entity {
await Promise.all(operation.map(async params => {
let response = await this._exec(MethodTypes.batchGet, params, config);
if (validations.isFunction(config.parse)) {
resultsAll.push(config.parse(config, response));
const parsed = config.parse(config, response);
resultsAll.push(parsed.data);
} else {
this.applyBulkGetResponseFormatting({
orderMaintainer,
Expand Down Expand Up @@ -473,23 +472,41 @@ class Entity {
ExclusiveStartKey = undefined;
}
let pages = this._normalizePagesValue(config.pages);
let max = this._normalizeLimitValue(config.limit);
let configLimit = this._normalizeNumberOptionsValue('limit', config.limit);
let configCount = this._normalizeNumberOptionsValue('count', config.count);
let max = this._safeMinimum(configLimit, configCount);
let iterations = 0;
let count = 0;
let hydratedUnprocessed = [];
const shouldHydrate = config.hydrate && method === MethodTypes.query;
do {
let limit = max === undefined
? parameters.Limit
: max - count;
let response = await this._exec(method, { ExclusiveStartKey, ...parameters, Limit: limit }, config);
ExclusiveStartKey = response.LastEvaluatedKey;
let remainingCount = configCount !== undefined
? max - count
: undefined;

let limit = configLimit !== undefined
? max - count
: undefined;

let params = { ExclusiveStartKey, ...parameters };

if (config.raw || (limit !== undefined && remainingCount === undefined)) {
params.Limit = limit;
}

let response = await this._exec(method, params, config);

response = this.formatResponse(response, parameters.IndexName, {
...config,
count: remainingCount,
includeKeys: shouldHydrate || config.includeKeys,
ignoreOwnership: shouldHydrate || config.ignoreOwnership,
_returnLastEvaluatedKeyRaw: true,
});

ExclusiveStartKey = response.lastEvaluatedKey;
delete response.lastEvaluatedKey;

if (config.raw) {
return response;
} else if (config._isCollectionQuery) {
Expand Down Expand Up @@ -655,15 +672,35 @@ class Entity {
}
}

_getLastEvaluatedKeyFromItem({indexName = TableIndex, item}) {
const indexFields = this.model.translations.keys[indexName];
const tableIndexFields = this.model.translations.keys[TableIndex];
const lastEvaluatedKey = {
[indexFields.pk]: item[indexFields.pk],
[tableIndexFields.pk]: item[tableIndexFields.pk],
}
if (indexFields.sk && item[indexFields.sk]) {
lastEvaluatedKey[indexFields.sk] = item[indexFields.sk]
}
if (tableIndexFields.sk && item[tableIndexFields.sk]) {
lastEvaluatedKey[tableIndexFields.sk] = item[tableIndexFields.sk]
}

return lastEvaluatedKey;
}

formatResponse(response, index, config = {}) {
let stackTrace;
if (!config.originalErr) {
stackTrace = new e.ElectroError(e.ErrorCodes.AWSError);
}
let lastEvaluatedKey = response.LastEvaluatedKey;
try {
let results = {};
if (validations.isFunction(config.parse)) {
results = config.parse(config, response);
const parsed = config.parse(config, response);
results = parsed.data;
lastEvaluatedKey = parsed.lastEvaluatedKey;
} else if (config.raw && !config._isPagination) {
if (response.TableName) {
results = {};
Expand All @@ -683,12 +720,27 @@ class Entity {
results = null;
}
} else if (response.Items) {
let size = typeof config.count === 'number' ? config.count : response.Items.length;
let count = 0;
let lastItem;
results = [];
for (let item of response.Items) {
for (let i = 0; i < response.Items.length; i++) {
const item = { ...response.Items[i] };
if (config.ignoreOwnership || this.ownsItem(item)) {
let record = this.model.schema.formatItemForRetrieval(item, config);
if (Object.keys(record).length > 0) {
count = count + 1;
if (count > size) {
if (lastItem) {
lastEvaluatedKey = this._getLastEvaluatedKeyFromItem({
indexName: index,
item: lastItem,
});
}
break;
}
results.push(record);
lastItem = response.Items[i];
}
}
}
Expand All @@ -704,8 +756,11 @@ class Entity {
}
}

if (config._isPagination || response.LastEvaluatedKey) {
const nextPage = this._formatReturnPager(config, response.LastEvaluatedKey);
if (config._isPagination || lastEvaluatedKey) {
const nextPage = this._formatReturnPager(config, lastEvaluatedKey);
if (config._returnLastEvaluatedKeyRaw) {
return { cursor: nextPage || null, data: results, lastEvaluatedKey };
}
return { cursor: nextPage || null, data: results };
}

Expand Down Expand Up @@ -818,16 +873,31 @@ class Entity {
return value;
}

_normalizeLimitValue(value) {
_normalizeNumberOptionsValue(option, value) {
if (value !== undefined) {
value = parseInt(value);
if (isNaN(value) || value < 1) {
throw new e.ElectroError(e.ErrorCodes.InvalidLimitOption, "Query option 'limit' must be of type 'number' and greater than zero.");
throw new e.ElectroError(e.ErrorCodes.InvalidLimitOption, `Query option '${option}' must be of type 'number' and greater than zero.`);
}
}
return value;
}

_safeMinimum(...values) {
let eligibleNumbers = [];
for (let value of values) {
if (typeof value === 'number') {
eligibleNumbers.push(value);
}
}

if (eligibleNumbers.length) {
return Math.min(...eligibleNumbers);
}

return undefined;
}

_createKeyDeconstructor(prefixes = {}, labels = [], attributes = {}) {
let {prefix, isCustom, postfix} = prefixes;
let names = [];
Expand Down Expand Up @@ -882,65 +952,6 @@ class Entity {
}
}

// _deconstructKeys(index, keyType, key, backupFacets = {}) {
// if (typeof key !== "string" || key.length === 0) {
// return null;
// }
//
// let accessPattern = this.model.translations.indexes.fromIndexToAccessPattern[index];
// let {prefix, isCustom} = this.model.prefixes[index][keyType];
// let {facets} = this.model.indexes[accessPattern][keyType];
// let names = [];
// let types = [];
// let pattern = `^${this._regexpEscape(prefix)}`;
// let labels = this.model.facets.labels[index][keyType] || [];
// for (let {name, label} of labels) {
// let attr = this.model.schema.attributes[name];
// if (attr) {
// if (isCustom) {
// pattern += `${this._regexpEscape(label === undefined ? "" : label)}(.+)`;
// } else {
// pattern += `#${this._regexpEscape(label === undefined ? name : label)}_(.+)`;
// }
// names.push(name);
// types.push(attr.type);
// }
// }
// pattern += "$";
// let regex = new RegExp(pattern, "i");
// let match = key.match(regex);
// let results = {};
// if (match) {
// for (let i = 0; i < names.length; i++) {
// let key = names[i];
// let value = match[i+1];
// let type = types[i];
// switch (type) {
// case "number":
// value = parseFloat(value);
// break;
// case "boolean":
// value = value === "true";
// break;
// }
// results[key] = value;
// }
// } else {
// if (Object.keys(backupFacets || {}).length === 0) {
// // this can occur when a scan is performed but returns no results given the current filters or record timing
// return {};
// }
// for (let facet of facets) {
// if (backupFacets[facet] === undefined) {
// throw new e.ElectroError(e.ErrorCodes.LastEvaluatedKey, 'LastEvaluatedKey contains entity that does not match the entity used to query. Use {pager: "raw"} query option.');
// } else {
// results[facet] = backupFacets[facet];
// }
// }
// }
// return results;
// }

_deconstructIndex({index = TableIndex, keys = {}} = {}) {
const hasIndex = !!this.model.translations.keys[index];
if (!hasIndex) {
Expand Down Expand Up @@ -1159,6 +1170,10 @@ class Entity {
config.params.Limit = option.limit;
}

if (typeof option.count === 'number') {
config.count = option.count;
}

if (validations.isStringHasLength(option.table)) {
config.params.TableName = option.table;
config.table = option.table;
Expand Down Expand Up @@ -1664,8 +1679,6 @@ class Entity {
const { updatedKeys, setAttributes, indexKey } = this._getPutKeys(pk, sk && sk.facets, upsert.data);
const upsertAttributes = this.model.schema.translateToFields(setAttributes);
const keyNames = Object.keys(indexKey);
// update.set(this.identifiers.entity, this.getName());
// update.set(this.identifiers.version, this.getVersion());
for (const field of [...Object.keys(upsertAttributes), ...Object.keys(updatedKeys)]) {
const value = u.getFirstDefined(upsertAttributes[field], updatedKeys[field]);
if (!keyNames.includes(field)) {
Expand Down
39 changes: 33 additions & 6 deletions src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,26 @@ class Service {

cleanseRetrievedData(index = TableIndex, entities, data = {}, config = {}) {
if (config.raw) {
return data;
if (config._returnLastEvaluatedKeyRaw) {
return {data, lastEvaluatedKey: data.LastEvaluatedKey};
} else {
return { data };
}
}
const identifiers = getEntityIdentifiers(entities);

data.Items = data.Items || [];

const results = {};
let size = typeof config.count === 'number' ? config.count : data.Items.length;
let count = 0;
let lastEvaluatedKey = data.LastEvaluatedKey;
for (let {alias} of identifiers) {
results[alias] = [];
}

for (let i = 0; i < data.Items.length; i++) {
const record = data.Items[i];
const record = { ...data.Items[i] };

if (!record) {
continue;
Expand All @@ -333,7 +340,7 @@ class Service {
let formatted;
if (config.hydrate) {
formatted = {
data: record // entities[entityAlias]._formatKeysToItem(index, record),
data: record
};
} else {
formatted = entities[entityAlias].formatResponse({Item: record}, index, {
Expand All @@ -343,9 +350,29 @@ class Service {
});
}

results[entityAlias].push(formatted.data);
if (formatted.data) {
count = count + 1;
if (count > size) {
lastEvaluatedKey = entities[entityAlias]._getLastEvaluatedKeyFromItem({
indexName: index,
item: data.Items[i - 1],
});
break;
}
results[entityAlias].push(formatted.data);
}
}

if (config._returnLastEvaluatedKeyRaw) {
return {
data: results,
lastEvaluatedKey
};
} else {
return {
data: results,
}
}
return results;
}

findKeyOwner(lastEvaluatedKey) {
Expand Down Expand Up @@ -409,7 +436,7 @@ class Service {
// expressions, // DynamoDB doesnt return what I expect it would when provided with these entity filters
parse: (options, data) => {
if (options.raw) {
return data;
return { data };
}
return this.cleanseRetrievedData(index, entities, data, options);
},
Expand Down
Loading