Skip to content

Commit

Permalink
Merge pull request #718 from tchak/observe-query
Browse files Browse the repository at this point in the history
Implement live query on @orbit/record-cache
  • Loading branch information
dgeb authored Mar 7, 2020
2 parents 8387c6a + 1a815bc commit 89c9387
Show file tree
Hide file tree
Showing 9 changed files with 940 additions and 0 deletions.
19 changes: 19 additions & 0 deletions packages/@orbit/record-cache/src/async-record-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
} from './record-accessor';
import { PatchResult } from './patch-result';
import { QueryResult, QueryResultData } from './query-result';
import { AsyncLiveQuery } from './live-query/async-live-query';

const { assert } = Orbit;

Expand Down Expand Up @@ -246,6 +247,24 @@ export abstract class AsyncRecordCache implements Evented, AsyncRecordAccessor {
return result;
}

liveQuery(
queryOrExpressions: QueryOrExpressions,
options?: object,
id?: string
): AsyncLiveQuery {
const query = buildQuery(
queryOrExpressions,
options,
id,
this.queryBuilder
);

return new AsyncLiveQuery({
cache: this,
query
});
}

/////////////////////////////////////////////////////////////////////////////
// Protected methods
/////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions packages/@orbit/record-cache/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ export * from './record-accessor';

export * from './async-record-cache';
export * from './async-operation-processor';
export * from './live-query/async-live-query';

export * from './sync-record-cache';
export * from './sync-operation-processor';
export * from './live-query/sync-live-query';

// Operators
export * from './operators/async-inverse-patch-operators';
Expand Down
57 changes: 57 additions & 0 deletions packages/@orbit/record-cache/src/live-query/async-live-query.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Schema, Query } from '@orbit/data';
import { QueryResult } from '../query-result';
import { AsyncRecordCache } from '../async-record-cache';
import { LiveQuery, LiveQuerySettings } from './live-query';

export interface AsyncLiveQueryUpdateSettings {
cache: AsyncRecordCache;
query: Query;
}

export class AsyncLiveQueryUpdate {
private _cache: AsyncRecordCache;
private _query: Query;

constructor(settings: AsyncLiveQueryUpdateSettings) {
this._cache = settings.cache;
this._query = settings.query;
}

query(): Promise<QueryResult> {
return this._cache.query(this._query);
}
}

export interface AsyncLiveQuerySettings extends LiveQuerySettings {
cache: AsyncRecordCache;
}

export class AsyncLiveQuery extends LiveQuery {
protected cache: AsyncRecordCache;

protected get schema(): Schema {
return this.cache.schema;
}

private get _update() {
return new AsyncLiveQueryUpdate({
cache: this.cache,
query: this._query
});
}

constructor(settings: AsyncLiveQuerySettings) {
super(settings);
this.cache = settings.cache;
}

async query(): Promise<QueryResult> {
return this._update.query();
}

subscribe(cb: (update: AsyncLiveQueryUpdate) => void): () => void {
return this._subscribe(() => {
cb(this._update);
});
}
}
182 changes: 182 additions & 0 deletions packages/@orbit/record-cache/src/live-query/live-query.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import Orbit, { Evented } from '@orbit/core';
import {
QueryExpression,
FindRecord,
FindRecords,
FindRelatedRecord,
FindRelatedRecords,
equalRecordIdentities,
Query,
Schema,
RecordOperation
} from '@orbit/data';

import { RecordChange, recordOperationChange } from './record-change';

const { assert } = Orbit;

export interface LiveQuerySettings {
query: Query;
}

export abstract class LiveQuery {
protected cache: Evented;
protected schema: Schema;

protected _query: Query;
protected _subscribe(onNext: () => void): () => void {
const execute = onceTick(onNext);

const unsubscribePatch = this.cache.on(
'patch',
(operation: RecordOperation) => {
if (this.operationRelevantForQuery(operation)) {
execute();
}
}
);

const unsubscribeReset = this.cache.on('reset', () => {
execute();
});

function unsubscribe() {
cancelTick(execute);
unsubscribePatch();
unsubscribeReset();
}

return unsubscribe;
}

constructor(settings: LiveQuerySettings) {
assert(
'Only single expression queries are supported on LiveQuery',
settings.query.expressions.length === 1
);
this._query = settings.query;
}

operationRelevantForQuery(operation: RecordOperation): boolean {
const change = recordOperationChange(operation);
const expression = this._query.expressions[0];
return this.queryExpressionRelevantForChange(expression, change);
}

protected queryExpressionRelevantForChange(
expression: QueryExpression,
change: RecordChange
): boolean {
switch (expression.op) {
case 'findRecord':
return this.findRecordQueryExpressionRelevantForChange(
expression as FindRecord,
change
);
case 'findRecords':
return this.findRecordsQueryExpressionRelevantForChange(
expression as FindRecords,
change
);
case 'findRelatedRecord':
return this.findRelatedRecordQueryExpressionRelevantForChange(
expression as FindRelatedRecord,
change
);
case 'findRelatedRecords':
return this.findRelatedRecordsQueryExpressionRelevantForChange(
expression as FindRelatedRecords,
change
);
default:
return true;
}
}

protected findRecordQueryExpressionRelevantForChange(
expression: FindRecord,
change: RecordChange
): boolean {
return equalRecordIdentities(expression.record, change);
}

protected findRecordsQueryExpressionRelevantForChange(
expression: FindRecords,
change: RecordChange
): boolean {
if (expression.type) {
return expression.type === change.type;
} else if (expression.records) {
for (let record of expression.records) {
if (record.type === change.type) {
return true;
}
}
return false;
}
return true;
}

protected findRelatedRecordQueryExpressionRelevantForChange(
expression: FindRelatedRecord,
change: RecordChange
): boolean {
return (
equalRecordIdentities(expression.record, change) &&
(change.relationships.includes(expression.relationship) || change.remove)
);
}

protected findRelatedRecordsQueryExpressionRelevantForChange(
expression: FindRelatedRecords,
change: RecordChange
): boolean {
const { type } = this.schema.getRelationship(
expression.record.type,
expression.relationship
);

if (Array.isArray(type) && type.find(type => type === change.type)) {
return true;
} else if (type === change.type) {
return true;
}

return (
equalRecordIdentities(expression.record, change) &&
(change.relationships.includes(expression.relationship) || change.remove)
);
}
}

const isNode =
typeof process === 'object' && typeof process.nextTick === 'function';
let resolvedPromise: Promise<void>;
const nextTick = isNode
? function(fn: () => void) {
if (!resolvedPromise) {
resolvedPromise = Promise.resolve();
}
resolvedPromise.then(() => {
process.nextTick(fn);
});
}
: window.setImmediate || setTimeout;

function onceTick(fn: () => void) {
return function tick() {
if (!ticks.has(tick)) {
ticks.add(tick);
nextTick(() => {
fn();
cancelTick(tick);
});
}
};
}

function cancelTick(tick: () => void) {
ticks.delete(tick);
}

const ticks = new WeakSet();
68 changes: 68 additions & 0 deletions packages/@orbit/record-cache/src/live-query/record-change.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import {
Record,
cloneRecordIdentity,
RecordIdentity,
RecordOperation
} from '@orbit/data';

export interface RecordChange extends RecordIdentity {
keys: string[];
attributes: string[];
relationships: string[];
meta: string[];
links: string[];
remove: boolean;
}

export function recordOperationChange(
operation: RecordOperation
): RecordChange {
const record = operation.record as Record;
const change: RecordChange = {
...cloneRecordIdentity(record),
remove: false,
keys: [],
attributes: [],
relationships: [],
meta: [],
links: []
};

switch (operation.op) {
case 'addRecord':
case 'updateRecord':
if (record.keys) {
change.keys = Object.keys(record.keys);
}
if (record.attributes) {
change.attributes = Object.keys(record.attributes);
}
if (record.relationships) {
change.relationships = Object.keys(record.relationships);
}
if (record.meta) {
change.meta = Object.keys(record.meta);
}
if (record.links) {
change.links = Object.keys(record.links);
}
break;
case 'replaceAttribute':
change.attributes = [operation.attribute];
break;
case 'replaceKey':
change.keys = [operation.key];
break;
case 'replaceRelatedRecord':
case 'replaceRelatedRecords':
case 'addToRelatedRecords':
case 'removeFromRelatedRecords':
change.relationships = [operation.relationship];
break;
case 'removeRecord':
change.remove = true;
break;
}

return change;
}
Loading

0 comments on commit 89c9387

Please sign in to comment.