Skip to content

Commit fa2f989

Browse files
committed
Merge branch 'subscribe'
2 parents 5b5399c + 409879d commit fa2f989

File tree

5 files changed

+326
-39
lines changed

5 files changed

+326
-39
lines changed

imports/client.tsx

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import atob from 'atob';
2-
import { gql, useQuery, useSubscription, useApolloClient } from '@apollo/client/index.js';
2+
import { gql, useQuery, useSubscription, useApolloClient, Observable } from '@apollo/client/index.js';
33
import type { ApolloQueryResult } from '@apollo/client/index.js';
44
import { generateApolloClient, IApolloClient } from '@deep-foundation/hasura/client.js';
55
import { useLocalStore } from '@deep-foundation/store/local.js';
@@ -14,6 +14,7 @@ import { corePckg } from './core.js';
1414
import { BoolExpCan, BoolExpHandler, QueryLink, BoolExpSelector, BoolExpTree, BoolExpValue, MutationInputLink, MutationInputLinkPlain, MutationInputValue } from './client_types.js';
1515
import get from 'get-value';
1616
import {debug} from './debug.js'
17+
import { Traveler as NativeTraveler } from './traveler.js';
1718
const moduleLog = debug.extend('client');
1819

1920
const log = debug.extend('log');
@@ -268,6 +269,19 @@ export function parseJwt (token): { userId: number; role: string; roles: string[
268269
...other,
269270
};
270271
};
272+
273+
export interface Subscription {
274+
closed: boolean;
275+
unsubscribe(): void;
276+
}
277+
278+
export interface Observer<T> {
279+
start?(subscription: Subscription): any;
280+
next?(value: T): void;
281+
error?(errorValue: any): void;
282+
complete?(): void;
283+
};
284+
271285
export interface DeepClientOptions<L = Link<number>> {
272286
linkId?: number;
273287
token?: string;
@@ -301,6 +315,7 @@ export interface DeepClientOptions<L = Link<number>> {
301315

302316
export interface DeepClientResult<R> extends ApolloQueryResult<R> {
303317
error?: any;
318+
subscribe?: (observer: Observer<any>) => Subscription;
304319
}
305320

306321
export type DeepClientPackageSelector = string;
@@ -339,7 +354,8 @@ export interface DeepClientInstance<L = Link<number>> {
339354

340355
stringify(any?: any): string;
341356

342-
select<TTable extends 'links'|'numbers'|'strings'|'objects'|'can'|'selectors'|'tree'|'handlers', LL = L>(exp: Exp<TTable>, options?: ReadOptions<TTable>): Promise<DeepClientResult<LL[]>>;
357+
select<TTable extends 'links'|'numbers'|'strings'|'objects'|'can'|'selectors'|'tree'|'handlers', LL = L>(exp: Exp<TTable>, options?: ReadOptions<TTable>): Promise<DeepClientResult<LL[] | number>>;
358+
subscribe<TTable extends 'links'|'numbers'|'strings'|'objects'|'can'|'selectors'|'tree'|'handlers', LL = L>(exp: Exp<TTable>, options?: ReadOptions<TTable>): Observable<LL[] | number>;
343359

344360
insert<TTable extends 'links'|'numbers'|'strings'|'objects', LL = L>(objects: InsertObjects<TTable> , options?: WriteOptions<TTable>):Promise<DeepClientResult<{ id }[]>>;
345361

@@ -377,6 +393,8 @@ export interface DeepClientInstance<L = Link<number>> {
377393
useDeep: typeof useDeep;
378394
DeepProvider: typeof DeepProvider;
379395
DeepContext: typeof DeepContext;
396+
397+
Traveler(links: Link<number>[]): NativeTraveler;
380398
}
381399

382400
export interface DeepClientAuthResult {
@@ -615,6 +633,8 @@ export class DeepClient<L = Link<number>> implements DeepClientInstance<L> {
615633
['strings', 'numbers', 'objects'].includes(table) ? this.valuesSelectReturning :
616634
table === 'selectors' ? this.selectorsSelectReturning :
617635
table === 'files' ? this.filesSelectReturning : `id`);
636+
const tableNamePostfix = options?.tableNamePostfix;
637+
const aggregate = options?.aggregate;
618638

619639
// console.log(`returning: ${returning}; options.returning:${options?.returning}`)
620640
const variables = options?.variables;
@@ -625,7 +645,8 @@ export class DeepClient<L = Link<number>> implements DeepClientInstance<L> {
625645
queries: [
626646
generateQueryData({
627647
tableName: table,
628-
returning,
648+
tableNamePostfix: tableNamePostfix || aggregate ? '_aggregate' : '',
649+
returning: aggregate ? `aggregate { ${aggregate} }` : returning,
629650
variables: {
630651
...variables,
631652
...query,
@@ -634,12 +655,80 @@ export class DeepClient<L = Link<number>> implements DeepClientInstance<L> {
634655
name: name,
635656
}));
636657

637-
return { ...q, data: (q)?.data?.q0 };
658+
return { ...q, data: aggregate ? (q)?.data?.q0?.aggregate?.[aggregate] : (q)?.data?.q0 };
638659
} catch (e) {
660+
console.log(generateQueryData({
661+
tableName: table,
662+
tableNamePostfix: tableNamePostfix || aggregate ? '_aggregate' : '',
663+
returning: aggregate ? `aggregate { ${aggregate} }` : returning,
664+
variables: {
665+
...variables,
666+
...query,
667+
} })('a', 0));
639668
throw new Error(`DeepClient Select Error: ${e.message}`, { cause: e });
640669
}
641670
};
642671

672+
/**
673+
* deep.subscribe
674+
* @example
675+
* deep.subscribe({ up: { link_id: 380 } }).subscribe({ next: (links) => {}, error: (err) => {} });
676+
*/
677+
subscribe<TTable extends 'links'|'numbers'|'strings'|'objects'|'can'|'selectors'|'tree'|'handlers', LL = L>(exp: Exp<TTable>, options?: ReadOptions<TTable>): Observable<LL[]> {
678+
if (!exp) {
679+
return new Observable((observer) => {
680+
observer.error('!exp');
681+
});
682+
}
683+
const query = serializeQuery(exp, options?.table || 'links');
684+
const table = options?.table || this.table;
685+
const returning = options?.returning ??
686+
(table === 'links' ? this.linksSelectReturning :
687+
['strings', 'numbers', 'objects'].includes(table) ? this.valuesSelectReturning :
688+
table === 'selectors' ? this.selectorsSelectReturning :
689+
table === 'files' ? this.filesSelectReturning : `id`);
690+
const tableNamePostfix = options?.tableNamePostfix;
691+
const aggregate = options?.aggregate;
692+
693+
// console.log(`returning: ${returning}; options.returning:${options?.returning}`)
694+
const variables = options?.variables;
695+
const name = options?.name || this.defaultSelectName;
696+
697+
try {
698+
const apolloObservable = this.apolloClient.subscribe({
699+
...generateQuery({
700+
operation: 'subscription',
701+
queries: [
702+
generateQueryData({
703+
tableName: table,
704+
tableNamePostfix: tableNamePostfix || aggregate ? '_aggregate' : '',
705+
returning: returning || aggregate ? `aggregate { ${aggregate} }` : returning,
706+
variables: {
707+
...variables,
708+
...query,
709+
} }),
710+
],
711+
name: name,
712+
}),
713+
});
714+
715+
const observable = new Observable((observer) => {
716+
const subscription = apolloObservable.subscribe({
717+
next: (data: any) => {
718+
observer.next(aggregate ? data?.q0?.aggregate?.[aggregate] : data?.q0);
719+
},
720+
error: (error) => observer.error(error),
721+
});
722+
return () => subscription.unsubscribe();
723+
});
724+
725+
// @ts-ignore
726+
return observable;
727+
} catch (e) {
728+
throw new Error(`DeepClient Subscription Error: ${e.message}`, { cause: e });
729+
}
730+
};
731+
643732
async insert<TTable extends 'links'|'numbers'|'strings'|'objects', LL = L>(objects: InsertObjects<TTable>, options?: WriteOptions<TTable>):Promise<DeepClientResult<{ id }[]>> {
644733
const _objects = Object.prototype.toString.call(objects) === '[object Array]' ? objects : [objects];
645734
checkAndFillShorts(_objects);
@@ -997,6 +1086,10 @@ export class DeepClient<L = Link<number>> implements DeepClientInstance<L> {
9971086
}
9981087
return await import(path);
9991088
}
1089+
1090+
Traveler(links: Link<number>[]) {
1091+
return new NativeTraveler(this, links);
1092+
};
10001093
}
10011094

10021095
export const JWT = gql`query JWT($linkId: Int) {
@@ -1074,6 +1167,7 @@ export function useDeepQuery<Table extends 'links'|'numbers'|'strings'|'objects'
10741167
query: QueryLink,
10751168
options?: {
10761169
table?: Table;
1170+
tableNamePostfix?: string;
10771171
returning?: string;
10781172
variables?: any;
10791173
name?: string;
@@ -1118,6 +1212,7 @@ export function useDeepSubscription<Table extends 'links'|'numbers'|'strings'|'o
11181212
query: QueryLink,
11191213
options?: {
11201214
table?: Table;
1215+
tableNamePostfix?: string;
11211216
returning?: string;
11221217
variables?: any;
11231218
name?: string;
@@ -1198,9 +1293,11 @@ export type InsertObjects<TTable extends Table> = (
11981293

11991294
export type Options<TTable extends Table> = {
12001295
table?: TTable;
1296+
tableNamePostfix?: string;
12011297
returning?: string;
12021298
variables?: any;
12031299
name?: string;
1300+
aggregate?: 'count' | 'sum' | 'avg' | 'min' | 'max';
12041301
};
12051302

12061303
export type ReadOptions<TTable extends Table> = Options<TTable>;

imports/gql/query.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const fieldsInputs = (tableName): IGenerateQueryFieldTypes => ({
1515
});
1616

1717
export interface IGenerateQueryDataOptions {
18+
tableNamePostfix?: string;
1819
tableName: string;
1920
operation?: 'query' | 'subscription';
2021
queryName?: string;
@@ -44,17 +45,18 @@ export interface IGenerateQueryDataResult extends IGenerateQueryDataOptions {
4445

4546
export const generateQueryData = ({
4647
tableName,
48+
tableNamePostfix = '',
4749
operation = 'query',
4850
queryName = `${tableName}`,
4951
returning = `id`,
5052
variables,
5153
}: IGenerateQueryDataOptions): IGenerateQueryDataBuilder => {
52-
log('generateQuery', { tableName, operation, queryName, returning, variables });
54+
log('generateQuery', { tableName, tableNamePostfix, operation, queryName, returning, variables });
5355
const fields = ['distinct_on', 'limit', 'offset', 'order_by', 'where'];
5456
const fieldTypes = fieldsInputs(tableName);
5557

5658
return (alias: string, index: number): IGenerateQueryDataResult => {
57-
log('generateQueryBuilder', { tableName, operation, queryName, returning, variables, alias, index });
59+
log('generateQueryBuilder', { tableName, tableNamePostfix, operation, queryName, returning, variables, alias, index });
5860
const defs = [];
5961
const args = [];
6062
for (let f = 0; f < fields.length; f++) {
@@ -72,8 +74,9 @@ export const generateQueryData = ({
7274
}
7375
const result = {
7476
tableName,
77+
tableNamePostfix,
7578
operation,
76-
queryName,
79+
queryName: queryName+tableNamePostfix,
7780
returning,
7881
variables,
7982
resultReturning: returning,

imports/minilinks.ts

Lines changed: 69 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import _remove from 'lodash/remove.js';
22
import _isEqual from 'lodash/isEqual.js';
33
import _isEqualWith from 'lodash/isEqualWith.js';
4+
import _mean from 'lodash/mean.js';
5+
import _sum from 'lodash/sum.js';
6+
import _min from 'lodash/min.js';
7+
import _max from 'lodash/max.js';
48
import EventEmitter from 'events';
59
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
610
import Debug from 'debug';
711
import { inherits } from 'util';
812
import { minilinksQuery, minilinksQueryIs } from './minilinks-query.js';
913
import { QueryLink } from './client_types.js';
1014
import { useDebounceCallback } from '@react-hook/debounce';
15+
import { Observable } from '@apollo/client/index.js';
1116

1217
const debug = Debug('deeplinks:minilinks');
1318
const log = debug.extend('log');
@@ -42,6 +47,11 @@ export interface LinkHashFields {
4247

4348
export interface Link<Ref extends number> extends LinkPlain<Ref>, LinkRelations<Link<Ref>>, LinkHashFields {}
4449

50+
export type MinilinksQueryOptionAggregate = 'count' | 'sum' | 'avg' | 'min' | 'max';
51+
export interface MinilinksQueryOptions<A = MinilinksQueryOptionAggregate> {
52+
aggregate?: A;
53+
}
54+
4555
export interface MinilinksResult<Link> {
4656
links: Link[];
4757
types: { [id: number]: Link[] };
@@ -51,7 +61,9 @@ export interface MinilinksResult<Link> {
5161
byType: { [id: number]: Link[] };
5262
options: MinilinksGeneratorOptions;
5363
emitter: EventEmitter;
54-
query(query: QueryLink | number): Link[];
64+
query(query: QueryLink | number): Link[] | any;
65+
select(query: QueryLink | number): Link[] | any;
66+
subscribe(query: QueryLink | number): Observable<Link[] | any>;
5567
add(linksArray: any[]): {
5668
anomalies?: MinilinkError[];
5769
errors?: MinilinkError[];
@@ -203,8 +215,45 @@ export class MinilinkCollection<MGO extends MinilinksGeneratorOptions = typeof M
203215
links: L[] = [];
204216
options: MGO;
205217
emitter: EventEmitter;
206-
query(query: QueryLink | number): L[] {
207-
return minilinksQuery<L>(query, this);
218+
219+
query<A>(query: QueryLink | number, options?: MinilinksQueryOptions<A>): A extends string ? any : L[] {
220+
const result = minilinksQuery<L>(query, this);
221+
if (options?.aggregate === 'count') return result?.length as any;
222+
if (options?.aggregate === 'avg') return _mean(result?.map(l => l?.value?.value)) as any;
223+
if (options?.aggregate === 'sum') return _sum(result?.map(l => l?.value?.value)) as any;
224+
if (options?.aggregate === 'min') return _min(result?.map(l => l?.value?.value)) as any;
225+
if (options?.aggregate === 'max') return _max(result?.map(l => l?.value?.value)) as any;
226+
return result;
227+
}
228+
select(query: QueryLink | number, options?: MinilinksQueryOptions): L[] | any {
229+
return this.query(query, options);
230+
}
231+
232+
/**
233+
* minilinks.subscribe
234+
* @example
235+
* minilinks.subscribe({ type_id: 2 }).subscribe({ next: (links) => {}, error: (err) => {} });
236+
*/
237+
subscribe(query: QueryLink | number): Observable<L[] | any> {
238+
const ml = this;
239+
return new Observable((observer) => {
240+
let prev = ml.query(query);
241+
observer.next(prev);
242+
let listener = (oldL, newL) => {
243+
const data = ml.query(query);
244+
if (!_isEqual(prev, data)) {
245+
observer.next(data);
246+
}
247+
};
248+
ml.emitter.on('added', listener);
249+
ml.emitter.on('updated', listener);
250+
ml.emitter.on('removed', listener);
251+
return () => {
252+
ml.emitter.removeListener('added', listener);
253+
ml.emitter.removeListener('updated', listener);
254+
ml.emitter.removeListener('removed', listener);
255+
}
256+
});
208257
}
209258
add(linksArray: any[]): {
210259
anomalies?: MinilinkError[];
@@ -592,36 +641,25 @@ export function useMinilinksQuery<L extends Link<number>>(ml, query: QueryLink |
592641
* Recalculates when data in minilinks changes. (Take query into useMemo!).
593642
*/
594643
export function useMinilinksSubscription<L extends Link<number>>(ml, query: QueryLink | number) {
595-
// console.log('54353246234562346435')
596-
const listenerRef = useRef<any>();
597-
const dRef = useRef<any>();
598644
const [d, setD] = useState();
599-
const qRef = useRef<any>(query);
600-
qRef.current = query;
645+
const sRef = useRef<any>();
646+
const qPrevRef = useRef<any>(query);
647+
const q = useMemo(() => _isEqual(query, qPrevRef.current) ? qPrevRef.current : query, [query]);
648+
qPrevRef.current = q;
601649
useEffect(() => {
602-
if (listenerRef.current) ml.emitter.removeListener('added', listenerRef.current);
603-
if (listenerRef.current) ml.emitter.removeListener('updated', listenerRef.current);
604-
if (listenerRef.current) ml.emitter.removeListener('removed', listenerRef.current);
605-
listenerRef.current = (oldL, newL) => {
606-
const prev = d || dRef.current;
607-
const data = ml.query(qRef.current);
608-
if (!_isEqual(prev, data)) {
609-
setD(data);
610-
}
611-
};
612-
ml.emitter.on('added', listenerRef.current);
613-
ml.emitter.on('updated', listenerRef.current);
614-
ml.emitter.on('removed', listenerRef.current);
650+
if (sRef.current) sRef.current.unsubscribe();
651+
const obs = ml.subscribe(q);
652+
const sub = sRef.current = obs.subscribe({
653+
next: (links) => {
654+
setD(links);
655+
},
656+
error: (error) => {
657+
throw new Error(error);
658+
},
659+
});
615660
return () => {
616-
ml.emitter.removeListener('added', listenerRef.current);
617-
ml.emitter.removeListener('updated', listenerRef.current);
618-
ml.emitter.removeListener('removed', listenerRef.current);
661+
sub.unsubscribe();
619662
}
620-
}, []);
621-
// const iterationsInterval = setInterval(() => {
622-
// setIteration((i: number) => i === Number.MAX_SAFE_INTEGER ? 0 : i+1)
623-
// }, 1000);
624-
// return () => clearInterval(iterationsInterval);
625-
const data = dRef.current = d ? d : ml.query(query);
626-
return data;
663+
}, [q]);
664+
return d || ml.query(query);
627665
};

0 commit comments

Comments
 (0)