Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres fixes #1476

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 39 additions & 36 deletions packages/malloy-db-postgres/src/postgres_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import {
StreamingConnection,
StructDef,
} from '@malloydata/malloy';
import {Client, Pool, PoolClient} from 'pg';
import {Client, Pool, QueryResult} from 'pg';
import QueryStream from 'pg-query-stream';
import {randomUUID} from 'crypto';
import {FetchSchemaOptions} from '@malloydata/malloy-interfaces';
Expand Down Expand Up @@ -102,7 +102,7 @@ export class PostgresConnection
}
}

private async readConfig(): Promise<PostgresConnectionConfiguration> {
protected async readConfig(): Promise<PostgresConnectionConfiguration> {
if (this.configReader instanceof Function) {
return this.configReader();
} else {
Expand Down Expand Up @@ -219,7 +219,7 @@ export class PostgresConnection
): Promise<MalloyQueryData> {
const client = await this.getClient();
await client.connect();
await this.connectionSetup();
await this.connectionSetup(client);

let result = await client.query(sqlCommand);
if (Array.isArray(result)) {
Expand Down Expand Up @@ -348,26 +348,20 @@ export class PostgresConnection
return structDef;
}

public async executeSQLRaw(query: string): Promise<QueryData> {
const config = await this.readQueryConfig();
const queryData = await this.runPostgresQuery(
query,
config.rowLimit || DEFAULT_PAGE_SIZE,
0,
false
);
return queryData.rows;
public async executeSQLRaw(query: string): Promise<QueryResult> {
const client = await this.getClient();
await client.connect();
const results = await client.query(query);
await client.end();
return results;
}

public async test(): Promise<void> {
await this.executeSQLRaw('SELECT 1');
}

public async connectionSetup(): Promise<void> {
if (!this.isSetup) {
this.executeSQLRaw("SET TIME ZONE 'UTC'");
this.isSetup = true;
}
public async connectionSetup(client: Client): Promise<void> {
await client.query("SET TIME ZONE 'UTC'");
}

public async runSQL(
Expand Down Expand Up @@ -429,24 +423,43 @@ export class PooledPostgresConnection
extends PostgresConnection
implements PooledConnection
{
private pool: Pool;
private _pool: Pool | undefined;

constructor(
name: string,
queryConfigReader: PostgresQueryConfigurationReader = {},
configReader: PostgresConnectionConfigurationReader = {}
) {
super(name, queryConfigReader, configReader);
this.pool = new Pool();
this.pool.on('acquire', client => client.query("SET TIME ZONE 'UTC'"));
}

public isPool(): true {
return true;
}

public async drain(): Promise<void> {
await this.pool.end();
await this._pool?.end();
}

async getPool(): Promise<Pool> {
if (!this._pool) {
const {
username: user,
password,
databaseName: database,
port,
host,
} = await this.readConfig();
this._pool = new Pool({
user,
password,
database,
port,
host,
});
this._pool.on('acquire', client => client.query("SET TIME ZONE 'UTC'"));
}
return this._pool;
}

protected async runPostgresQuery(
Expand All @@ -455,7 +468,8 @@ export class PooledPostgresConnection
_rowIndex: number,
deJSON: boolean
): Promise<MalloyQueryData> {
let result = await this.pool.query(sqlCommand);
const pool = await this.getPool();
let result = await pool.query(sqlCommand);

if (Array.isArray(result)) {
result = result.pop();
Expand All @@ -471,18 +485,6 @@ export class PooledPostgresConnection
};
}

private async getClientFromPool(): Promise<[PoolClient, () => void]> {
return await new Promise((resolve, reject) =>
this.pool.connect((error, client: PoolClient, releaseClient) => {
if (error) {
reject(error);
} else {
resolve([client, releaseClient]);
}
})
);
}

public async *runSQLStream(
sqlCommand: string,
options?: {rowLimit?: number}
Expand All @@ -493,7 +495,8 @@ export class PooledPostgresConnection
// type. Because `query` is a `QueryStream`, the result is supposed to be a
// `QueryStream` as well, but it's not. So instead, we get a client and call
// `client.query(query)`, which does what it's supposed to.
const [client, releaseClient] = await this.getClientFromPool();
const pool = await this.getPool();
const client = await pool.connect();
const resultStream: QueryStream = client.query(query);
for await (const row of resultStream) {
yield row.row as QueryDataRow;
Expand All @@ -503,7 +506,7 @@ export class PooledPostgresConnection
break;
}
}
releaseClient();
client.release();
}

async close(): Promise<void> {
Expand Down