From 39094523475a02f6042d0c0b4c09d2f1bc07f3ba Mon Sep 17 00:00:00 2001 From: Will Scullin Date: Mon, 6 Nov 2023 09:52:55 -0800 Subject: [PATCH] Postgres fixes --- .../src/postgres_connection.ts | 75 ++++++++++--------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/packages/malloy-db-postgres/src/postgres_connection.ts b/packages/malloy-db-postgres/src/postgres_connection.ts index 369b68199..d828e0625 100644 --- a/packages/malloy-db-postgres/src/postgres_connection.ts +++ b/packages/malloy-db-postgres/src/postgres_connection.ts @@ -43,7 +43,7 @@ import { StreamingConnection, StructDef, } from '@malloydata/malloy'; -import {Client, Pool, PoolClient} from 'pg'; +import {Client, Pool, QueryResult} from 'pg'; import QueryStream from 'pg-query-stream'; import {randomUUID} from 'crypto'; import {FetchSchemaOptions} from '@malloydata/malloy-interfaces'; @@ -102,7 +102,7 @@ export class PostgresConnection } } - private async readConfig(): Promise { + protected async readConfig(): Promise { if (this.configReader instanceof Function) { return this.configReader(); } else { @@ -219,7 +219,7 @@ export class PostgresConnection ): Promise { const client = await this.getClient(); await client.connect(); - await this.connectionSetup(); + await this.connectionSetup(client); let result = await client.query(sqlCommand); if (Array.isArray(result)) { @@ -348,26 +348,20 @@ export class PostgresConnection return structDef; } - public async executeSQLRaw(query: string): Promise { - const config = await this.readQueryConfig(); - const queryData = await this.runPostgresQuery( - query, - config.rowLimit || DEFAULT_PAGE_SIZE, - 0, - false - ); - return queryData.rows; + public async executeSQLRaw(query: string): Promise { + const client = await this.getClient(); + await client.connect(); + const results = await client.query(query); + await client.end(); + return results; } public async test(): Promise { await this.executeSQLRaw('SELECT 1'); } - public async connectionSetup(): Promise { - if (!this.isSetup) { - this.executeSQLRaw("SET TIME ZONE 'UTC'"); - this.isSetup = true; - } + public async connectionSetup(client: Client): Promise { + await client.query("SET TIME ZONE 'UTC'"); } public async runSQL( @@ -429,7 +423,7 @@ export class PooledPostgresConnection extends PostgresConnection implements PooledConnection { - private pool: Pool; + private _pool: Pool | undefined; constructor( name: string, @@ -437,8 +431,6 @@ export class PooledPostgresConnection configReader: PostgresConnectionConfigurationReader = {} ) { super(name, queryConfigReader, configReader); - this.pool = new Pool(); - this.pool.on('acquire', client => client.query("SET TIME ZONE 'UTC'")); } public isPool(): true { @@ -446,7 +438,28 @@ export class PooledPostgresConnection } public async drain(): Promise { - await this.pool.end(); + await this._pool?.end(); + } + + async getPool(): Promise { + if (!this._pool) { + const { + username: user, + password, + databaseName: database, + port, + host, + } = await this.readConfig(); + this._pool = new Pool({ + user, + password, + database, + port, + host, + }); + this._pool.on('acquire', client => client.query("SET TIME ZONE 'UTC'")); + } + return this._pool; } protected async runPostgresQuery( @@ -455,7 +468,8 @@ export class PooledPostgresConnection _rowIndex: number, deJSON: boolean ): Promise { - let result = await this.pool.query(sqlCommand); + const pool = await this.getPool(); + let result = await pool.query(sqlCommand); if (Array.isArray(result)) { result = result.pop(); @@ -471,18 +485,6 @@ export class PooledPostgresConnection }; } - private async getClientFromPool(): Promise<[PoolClient, () => void]> { - return await new Promise((resolve, reject) => - this.pool.connect((error, client: PoolClient, releaseClient) => { - if (error) { - reject(error); - } else { - resolve([client, releaseClient]); - } - }) - ); - } - public async *runSQLStream( sqlCommand: string, options?: {rowLimit?: number} @@ -493,7 +495,8 @@ export class PooledPostgresConnection // type. Because `query` is a `QueryStream`, the result is supposed to be a // `QueryStream` as well, but it's not. So instead, we get a client and call // `client.query(query)`, which does what it's supposed to. - const [client, releaseClient] = await this.getClientFromPool(); + const pool = await this.getPool(); + const client = await pool.connect(); const resultStream: QueryStream = client.query(query); for await (const row of resultStream) { yield row.row as QueryDataRow; @@ -503,7 +506,7 @@ export class PooledPostgresConnection break; } } - releaseClient(); + client.release(); } async close(): Promise {