Skip to content

Commit

Permalink
Subscription (#46)
Browse files Browse the repository at this point in the history
* graphql subscriptions for events

* correct table name in graphql_subscription
  • Loading branch information
ramilexe authored Mar 26, 2021
1 parent 3342d6f commit 7dd44f9
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 5 deletions.
80 changes: 80 additions & 0 deletions src/migrations/1616693016988-GraphqlSubscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import {MigrationInterface, QueryRunner} from "typeorm";

export class GraphqlSubscription1616693016988 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE FUNCTION public.graphql_subscription() returns TRIGGER as $$
declare
event_name text = TG_ARGV[0];
table_name text = TG_ARGV[1];
attribute text = TG_ARGV[2];
id text;
begin
execute 'select $1.' || quote_ident(attribute)
using new
into id;
perform pg_notify('postgraphile:' || event_name,
json_build_object(
'__node__', json_build_array(
table_name,
id
)
)::text
);
return new;
end;
$$ language plpgsql;
CREATE TRIGGER header_cids_ai
after INSERT ON eth.header_cids
for each row
execute procedure graphql_subscription('header_cids', 'header_cids', 'id');
CREATE TRIGGER receipt_cids_ai
after INSERT ON eth.receipt_cids
for each row
execute procedure graphql_subscription('receipt_cids', 'receipt_cids', 'id');
CREATE TRIGGER state_accounts_ai
after INSERT ON eth.state_accounts
for each row
execute procedure graphql_subscription('state_accounts', 'state_accounts', 'id');
CREATE TRIGGER state_cids_ai
after INSERT ON eth.state_cids
for each row
execute procedure graphql_subscription('state_cids', 'state_cids', 'id');
CREATE TRIGGER storage_cids_ai
after INSERT ON eth.storage_cids
for each row
execute procedure graphql_subscription('storage_cids', 'storage_cids', 'id');
CREATE TRIGGER transaction_cids_ai
after INSERT ON eth.transaction_cids
for each row
execute procedure graphql_subscription('transaction_cids', 'transaction_cids', 'id');
CREATE TRIGGER uncle_cids_ai
after INSERT ON eth.uncle_cids
for each row
execute procedure graphql_subscription('uncle_cids', 'uncle_cids', 'id');
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
DROP TRIGGER uncle_cids_ai ON eth.uncle_cids;
DROP TRIGGER transaction_cids_ai ON eth.transaction_cids;
DROP TRIGGER storage_cids_ai ON eth.storage_cids;
DROP TRIGGER state_cids_ai ON eth.state_cids;
DROP TRIGGER state_accounts_ai ON eth.state_accounts;
DROP TRIGGER receipt_cids_ai ON eth.receipt_cids;
DROP TRIGGER header_cids_ai ON eth.header_cids;
DROP FUNCTION public.graphql_subscription();
`);
}

}
26 changes: 21 additions & 5 deletions src/services/dataService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,13 @@ VALUES
}
}

private static _getTableName({ contractId, type = 'event', id}): string {
return `data.contract_id_${contractId}_${type}_id_${id}`;
private static _getTableName({ contractId, type = 'event', id}, withSchema: boolean = true): string {
let tableName = `contract_id_${contractId}_${type}_id_${id}`;
if (withSchema) {
tableName = `data.${tableName}`;
}

return tableName;
}

private static _getTableOptions(contract: Contract, { event }: { event?: Event }): TableOptions {
Expand Down Expand Up @@ -802,12 +807,17 @@ VALUES

private async _createEventTable(contract: Contract, event: Event): Promise<void> {
return getConnection().transaction(async (entityManager) => {
const tableName = DataService._getTableName({
const tableNameWithSchema = DataService._getTableName({
contractId: contract.contractId,
type: 'event',
id: event.eventId
});
const table = await entityManager.queryRunner.getTable(tableName);
const tableName = DataService._getTableName({
contractId: contract.contractId,
type: 'event',
id: event.eventId
}, false);
const table = await entityManager.queryRunner.getTable(tableNameWithSchema);

if (table) {
//console.log(`Table ${tableName} already exists`);
Expand All @@ -816,7 +826,13 @@ VALUES

const tableOptions = DataService._getTableOptions(contract, { event });
await entityManager.queryRunner.createTable(new Table(tableOptions), true);
console.log('create new table', tableName);
await entityManager.queryRunner.query(`
CREATE TRIGGER ai
after INSERT ON ${tableNameWithSchema}
for each row
execute procedure graphql_subscription('events', '${tableName}S', 'id');
`)
console.log('create new table', tableNameWithSchema);
});
}

Expand Down

0 comments on commit 7dd44f9

Please sign in to comment.