Skip to content

Commit

Permalink
deep/ml.subscribe, Traveler, aggregation: count|avg|sum|min|max
Browse files Browse the repository at this point in the history
  • Loading branch information
ivansglazunov committed Nov 15, 2023
1 parent 532e999 commit 1443732
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 35 deletions.
105 changes: 101 additions & 4 deletions imports/client.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import atob from 'atob';
import { gql, useQuery, useSubscription, useApolloClient } from '@apollo/client/index.js';
import { gql, useQuery, useSubscription, useApolloClient, Observable } from '@apollo/client/index.js';
import type { ApolloQueryResult } from '@apollo/client/index.js';
import { generateApolloClient, IApolloClient } from '@deep-foundation/hasura/client.js';
import { useLocalStore } from '@deep-foundation/store/local.js';
Expand All @@ -14,6 +14,7 @@ import { corePckg } from './core.js';
import { BoolExpCan, BoolExpHandler, QueryLink, BoolExpSelector, BoolExpTree, BoolExpValue, MutationInputLink, MutationInputLinkPlain, MutationInputValue } from './client_types.js';
import get from 'get-value';
import {debug} from './debug.js'
import { Traveler as NativeTraveler } from './traveler.js';
const moduleLog = debug.extend('client');

const log = debug.extend('log');
Expand Down Expand Up @@ -268,6 +269,19 @@ export function parseJwt (token): { userId: number; role: string; roles: string[
...other,
};
};

export interface Subscription {
closed: boolean;
unsubscribe(): void;
}

export interface Observer<T> {
start?(subscription: Subscription): any;
next?(value: T): void;
error?(errorValue: any): void;
complete?(): void;
};

export interface DeepClientOptions<L = Link<number>> {
linkId?: number;
token?: string;
Expand Down Expand Up @@ -301,6 +315,7 @@ export interface DeepClientOptions<L = Link<number>> {

export interface DeepClientResult<R> extends ApolloQueryResult<R> {
error?: any;
subscribe?: (observer: Observer<any>) => Subscription;
}

export type DeepClientPackageSelector = string;
Expand Down Expand Up @@ -339,7 +354,8 @@ export interface DeepClientInstance<L = Link<number>> {

stringify(any?: any): string;

select<TTable extends 'links'|'numbers'|'strings'|'objects'|'can'|'selectors'|'tree'|'handlers', LL = L>(exp: Exp<TTable>, options?: ReadOptions<TTable>): Promise<DeepClientResult<LL[]>>;
select<TTable extends 'links'|'numbers'|'strings'|'objects'|'can'|'selectors'|'tree'|'handlers', LL = L>(exp: Exp<TTable>, options?: ReadOptions<TTable>): Promise<DeepClientResult<LL[] | number>>;
subscribe<TTable extends 'links'|'numbers'|'strings'|'objects'|'can'|'selectors'|'tree'|'handlers', LL = L>(exp: Exp<TTable>, options?: ReadOptions<TTable>): Observable<LL[] | number>;

insert<TTable extends 'links'|'numbers'|'strings'|'objects', LL = L>(objects: InsertObjects<TTable> , options?: WriteOptions<TTable>):Promise<DeepClientResult<{ id }[]>>;

Expand Down Expand Up @@ -377,6 +393,8 @@ export interface DeepClientInstance<L = Link<number>> {
useDeep: typeof useDeep;
DeepProvider: typeof DeepProvider;
DeepContext: typeof DeepContext;

Traveler(links: Link<number>[]): NativeTraveler;
}

export interface DeepClientAuthResult {
Expand Down Expand Up @@ -615,6 +633,8 @@ export class DeepClient<L = Link<number>> implements DeepClientInstance<L> {
['strings', 'numbers', 'objects'].includes(table) ? this.valuesSelectReturning :
table === 'selectors' ? this.selectorsSelectReturning :
table === 'files' ? this.filesSelectReturning : `id`);
const tableNamePostfix = options?.tableNamePostfix;
const aggregate = options?.aggregate;

// console.log(`returning: ${returning}; options.returning:${options?.returning}`)
const variables = options?.variables;
Expand All @@ -625,7 +645,8 @@ export class DeepClient<L = Link<number>> implements DeepClientInstance<L> {
queries: [
generateQueryData({
tableName: table,
returning,
tableNamePostfix: tableNamePostfix || aggregate ? '_aggregate' : '',
returning: aggregate ? `aggregate { ${aggregate} }` : returning,
variables: {
...variables,
...query,
Expand All @@ -634,12 +655,80 @@ export class DeepClient<L = Link<number>> implements DeepClientInstance<L> {
name: name,
}));

return { ...q, data: (q)?.data?.q0 };
return { ...q, data: aggregate ? (q)?.data?.q0?.aggregate?.[aggregate] : (q)?.data?.q0 };
} catch (e) {
console.log(generateQueryData({
tableName: table,
tableNamePostfix: tableNamePostfix || aggregate ? '_aggregate' : '',
returning: aggregate ? `aggregate { ${aggregate} }` : returning,
variables: {
...variables,
...query,
} })('a', 0));
throw new Error(`DeepClient Select Error: ${e.message}`, { cause: e });
}
};

/**
* deep.subscribe
* @example
* deep.subscribe({ up: { link_id: 380 } }).subscribe({ next: (links) => {}, error: (err) => {} });
*/
subscribe<TTable extends 'links'|'numbers'|'strings'|'objects'|'can'|'selectors'|'tree'|'handlers', LL = L>(exp: Exp<TTable>, options?: ReadOptions<TTable>): Observable<LL[]> {
if (!exp) {
return new Observable((observer) => {
observer.error('!exp');
});
}
const query = serializeQuery(exp, options?.table || 'links');
const table = options?.table || this.table;
const returning = options?.returning ??
(table === 'links' ? this.linksSelectReturning :
['strings', 'numbers', 'objects'].includes(table) ? this.valuesSelectReturning :
table === 'selectors' ? this.selectorsSelectReturning :
table === 'files' ? this.filesSelectReturning : `id`);
const tableNamePostfix = options?.tableNamePostfix;
const aggregate = options?.aggregate;

// console.log(`returning: ${returning}; options.returning:${options?.returning}`)
const variables = options?.variables;
const name = options?.name || this.defaultSelectName;

try {
const apolloObservable = this.apolloClient.subscribe({
...generateQuery({
operation: 'subscription',
queries: [
generateQueryData({
tableName: table,
tableNamePostfix: tableNamePostfix || aggregate ? '_aggregate' : '',
returning: returning || aggregate ? `aggregate { ${aggregate} }` : returning,
variables: {
...variables,
...query,
} }),
],
name: name,
}),
});

const observable = new Observable((observer) => {
const subscription = apolloObservable.subscribe({
next: (data: any) => {
observer.next(aggregate ? data?.q0?.aggregate?.[aggregate] : data?.q0);
},
error: (error) => observer.error(error),
});
return () => subscription.unsubscribe();
});

// @ts-ignore
return observable;
} catch (e) {
throw new Error(`DeepClient Subscription Error: ${e.message}`, { cause: e });
}
};

async insert<TTable extends 'links'|'numbers'|'strings'|'objects', LL = L>(objects: InsertObjects<TTable>, options?: WriteOptions<TTable>):Promise<DeepClientResult<{ id }[]>> {
const _objects = Object.prototype.toString.call(objects) === '[object Array]' ? objects : [objects];
checkAndFillShorts(_objects);
Expand Down Expand Up @@ -997,6 +1086,10 @@ export class DeepClient<L = Link<number>> implements DeepClientInstance<L> {
}
return await import(path);
}

Traveler(links: Link<number>[]) {
return new NativeTraveler(this, links);
};
}

export const JWT = gql`query JWT($linkId: Int) {
Expand Down Expand Up @@ -1074,6 +1167,7 @@ export function useDeepQuery<Table extends 'links'|'numbers'|'strings'|'objects'
query: QueryLink,
options?: {
table?: Table;
tableNamePostfix?: string;
returning?: string;
variables?: any;
name?: string;
Expand Down Expand Up @@ -1118,6 +1212,7 @@ export function useDeepSubscription<Table extends 'links'|'numbers'|'strings'|'o
query: QueryLink,
options?: {
table?: Table;
tableNamePostfix?: string;
returning?: string;
variables?: any;
name?: string;
Expand Down Expand Up @@ -1198,9 +1293,11 @@ export type InsertObjects<TTable extends Table> = (

export type Options<TTable extends Table> = {
table?: TTable;
tableNamePostfix?: string;
returning?: string;
variables?: any;
name?: string;
aggregate?: 'count' | 'sum' | 'avg' | 'min' | 'max';
};

export type ReadOptions<TTable extends Table> = Options<TTable>;
Expand Down
9 changes: 6 additions & 3 deletions imports/gql/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const fieldsInputs = (tableName): IGenerateQueryFieldTypes => ({
});

export interface IGenerateQueryDataOptions {
tableNamePostfix?: string;
tableName: string;
operation?: 'query' | 'subscription';
queryName?: string;
Expand Down Expand Up @@ -44,17 +45,18 @@ export interface IGenerateQueryDataResult extends IGenerateQueryDataOptions {

export const generateQueryData = ({
tableName,
tableNamePostfix = '',
operation = 'query',
queryName = `${tableName}`,
returning = `id`,
variables,
}: IGenerateQueryDataOptions): IGenerateQueryDataBuilder => {
log('generateQuery', { tableName, operation, queryName, returning, variables });
log('generateQuery', { tableName, tableNamePostfix, operation, queryName, returning, variables });
const fields = ['distinct_on', 'limit', 'offset', 'order_by', 'where'];
const fieldTypes = fieldsInputs(tableName);

return (alias: string, index: number): IGenerateQueryDataResult => {
log('generateQueryBuilder', { tableName, operation, queryName, returning, variables, alias, index });
log('generateQueryBuilder', { tableName, tableNamePostfix, operation, queryName, returning, variables, alias, index });
const defs = [];
const args = [];
for (let f = 0; f < fields.length; f++) {
Expand All @@ -72,8 +74,9 @@ export const generateQueryData = ({
}
const result = {
tableName,
tableNamePostfix,
operation,
queryName,
queryName: queryName+tableNamePostfix,
returning,
variables,
resultReturning: returning,
Expand Down
89 changes: 61 additions & 28 deletions imports/minilinks.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import _remove from 'lodash/remove.js';
import _isEqual from 'lodash/isEqual.js';
import _isEqualWith from 'lodash/isEqualWith.js';
import _mean from 'lodash/mean.js';
import _sum from 'lodash/sum.js';
import _min from 'lodash/min.js';
import _max from 'lodash/max.js';
import EventEmitter from 'events';
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import Debug from 'debug';
import { inherits } from 'util';
import { minilinksQuery, minilinksQueryIs } from './minilinks-query.js';
import { QueryLink } from './client_types.js';
import { useDebounceCallback } from '@react-hook/debounce';
import { Observable } from '@apollo/client';

const debug = Debug('deeplinks:minilinks');
const log = debug.extend('log');
Expand Down Expand Up @@ -42,6 +47,11 @@ export interface LinkHashFields {

export interface Link<Ref extends number> extends LinkPlain<Ref>, LinkRelations<Link<Ref>>, LinkHashFields {}

export type MinilinksQueryOptionAggregate = 'count' | 'sum' | 'avg' | 'min' | 'max';
export interface MinilinksQueryOptions<A = MinilinksQueryOptionAggregate> {
aggregate?: A;
}

export interface MinilinksResult<Link> {
links: Link[];
types: { [id: number]: Link[] };
Expand All @@ -51,7 +61,9 @@ export interface MinilinksResult<Link> {
byType: { [id: number]: Link[] };
options: MinilinksGeneratorOptions;
emitter: EventEmitter;
query(query: QueryLink | number): Link[];
query(query: QueryLink | number): Link[] | any;
select(query: QueryLink | number): Link[] | any;
subscribe(query: QueryLink | number): Observable<Link[] | any>;
add(linksArray: any[]): {
anomalies?: MinilinkError[];
errors?: MinilinkError[];
Expand Down Expand Up @@ -203,8 +215,45 @@ export class MinilinkCollection<MGO extends MinilinksGeneratorOptions = typeof M
links: L[] = [];
options: MGO;
emitter: EventEmitter;
query(query: QueryLink | number): L[] {
return minilinksQuery<L>(query, this);

query<A>(query: QueryLink | number, options?: MinilinksQueryOptions<A>): A extends string ? any : L[] {
const result = minilinksQuery<L>(query, this);
if (options?.aggregate === 'count') return result?.length as any;
if (options?.aggregate === 'avg') return _mean(result?.map(l => l?.value?.value)) as any;
if (options?.aggregate === 'sum') return _sum(result?.map(l => l?.value?.value)) as any;
if (options?.aggregate === 'min') return _min(result?.map(l => l?.value?.value)) as any;
if (options?.aggregate === 'max') return _max(result?.map(l => l?.value?.value)) as any;
return result;
}
select(query: QueryLink | number, options?: MinilinksQueryOptions): L[] | any {
return this.query(query, options);
}

/**
* minilinks.subscribe
* @example
* minilinks.subscribe({ type_id: 2 }).subscribe({ next: (links) => {}, error: (err) => {} });
*/
subscribe(query: QueryLink | number): Observable<L[] | any> {
const ml = this;
return new Observable((observer) => {
let prev = ml.query(query);
observer.next(prev);
let listener = (oldL, newL) => {
const data = ml.query(query);
if (!_isEqual(prev, data)) {
observer.next(data);
}
};
ml.emitter.on('added', listener);
ml.emitter.on('updated', listener);
ml.emitter.on('removed', listener);
return () => {
ml.emitter.removeListener('added', listener);
ml.emitter.removeListener('updated', listener);
ml.emitter.removeListener('removed', listener);
}
});
}
add(linksArray: any[]): {
anomalies?: MinilinkError[];
Expand Down Expand Up @@ -592,36 +641,20 @@ export function useMinilinksQuery<L extends Link<number>>(ml, query: QueryLink |
* Recalculates when data in minilinks changes. (Take query into useMemo!).
*/
export function useMinilinksSubscription<L extends Link<number>>(ml, query: QueryLink | number) {
// console.log('54353246234562346435')
const listenerRef = useRef<any>();
const dRef = useRef<any>();
const observerRef = useRef<any>();
const [d, setD] = useState();
const qRef = useRef<any>(query);
qRef.current = query;
useEffect(() => {
if (listenerRef.current) ml.emitter.removeListener('added', listenerRef.current);
if (listenerRef.current) ml.emitter.removeListener('updated', listenerRef.current);
if (listenerRef.current) ml.emitter.removeListener('removed', listenerRef.current);
listenerRef.current = (oldL, newL) => {
const prev = d || dRef.current;
const data = ml.query(qRef.current);
if (!_isEqual(prev, data)) {
setD(data);
}
};
ml.emitter.on('added', listenerRef.current);
ml.emitter.on('updated', listenerRef.current);
ml.emitter.on('removed', listenerRef.current);
!!observerRef.current && observerRef.current.unsubscribe();
const obs = observerRef.current = ml.subscribe(qRef.current);
const sub = obs.subscribe({
next: (links) => setD(links),
error: (error) => { throw new Error(error) },
});
return () => {
ml.emitter.removeListener('added', listenerRef.current);
ml.emitter.removeListener('updated', listenerRef.current);
ml.emitter.removeListener('removed', listenerRef.current);
sub.unsubscribe();
}
}, []);
// const iterationsInterval = setInterval(() => {
// setIteration((i: number) => i === Number.MAX_SAFE_INTEGER ? 0 : i+1)
// }, 1000);
// return () => clearInterval(iterationsInterval);
const data = dRef.current = d ? d : ml.query(query);
return data;
return d || ml.query(query);
};
Loading

0 comments on commit 1443732

Please sign in to comment.