Skip to content

Commit 8af9492

Browse files
committed
snowflake connector handles timeout and abortsignal
Signed-off-by: Carina Koo <carina@datairis.io>
1 parent 9cbdd4d commit 8af9492

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

packages/malloy-db-snowflake/src/snowflake_connection.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ export interface SnowflakeConnectionOptions {
6363
scratchSpace?: namespace;
6464

6565
queryOptions?: RunSQLOptions;
66+
67+
// Timeout for the statement
68+
timeoutMs?: number;
6669
}
6770

6871
type PathChain =
@@ -207,6 +210,11 @@ class SnowArray extends SnowField {
207210
}
208211
}
209212

213+
/**
214+
* Default statement timeoutMs value, 10 Mins
215+
*/
216+
const TIMEOUT_MS = 1000 * 60 * 10;
217+
210218
export class SnowflakeConnection
211219
extends BaseConnection
212220
implements
@@ -221,6 +229,7 @@ export class SnowflakeConnection
221229
// the database & schema where we do temporary operations like creating a temp table
222230
private scratchSpace?: namespace;
223231
private queryOptions: RunSQLOptions;
232+
private timeoutMs: number;
224233

225234
constructor(
226235
public readonly name: string,
@@ -235,6 +244,7 @@ export class SnowflakeConnection
235244
this.executor = new SnowflakeExecutor(connOptions, options?.poolOptions);
236245
this.scratchSpace = options?.scratchSpace;
237246
this.queryOptions = options?.queryOptions ?? {};
247+
this.timeoutMs = options?.timeoutMs ?? TIMEOUT_MS;
238248
}
239249

240250
get dialectName(): string {
@@ -273,10 +283,10 @@ export class SnowflakeConnection
273283

274284
public async runSQL(
275285
sql: string,
276-
options?: RunSQLOptions
286+
options: RunSQLOptions = {}
277287
): Promise<MalloyQueryData> {
278288
const rowLimit = options?.rowLimit ?? this.queryOptions?.rowLimit;
279-
let rows = await this.executor.batch(sql);
289+
let rows = await this.executor.batch(sql, options, this.timeoutMs);
280290
if (rowLimit !== undefined && rows.length > rowLimit) {
281291
rows = rows.slice(0, rowLimit);
282292
}

packages/malloy-db-snowflake/src/snowflake_executor.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ export interface ConnectionConfigFile {
5656
// return ret;
5757
// }
5858

59+
5960
export class SnowflakeExecutor {
6061
private static defaultPoolOptions_: PoolOptions = {
6162
min: 1,
@@ -149,15 +150,25 @@ export class SnowflakeExecutor {
149150
});
150151
}
151152

152-
public async _execute(sqlText: string, conn: Connection): Promise<QueryData> {
153-
return new Promise((resolve, reject) => {
154-
const _statment = conn.execute({
153+
public async _execute(sqlText: string, conn: Connection, options?: RunSQLOptions, timeoutMs?: number): Promise<QueryData> {
154+
let _statement: RowStatement | undefined;
155+
const cancel = () => {
156+
_statement?.cancel();
157+
}
158+
const timeoutId = timeoutMs ? setTimeout(cancel, timeoutMs) : undefined;
159+
options?.abortSignal?.addEventListener('abort', cancel);
160+
return await new Promise((resolve, reject) => {
161+
_statement = conn.execute({
155162
sqlText,
156163
complete: (
157164
err: SnowflakeError | undefined,
158165
_stmt: RowStatement,
159166
rows?: QueryData
160167
) => {
168+
options?.abortSignal?.removeEventListener('abort', cancel);
169+
if (timeoutId) {
170+
clearTimeout(timeoutId);
171+
}
161172
if (err) {
162173
reject(err);
163174
} else if (rows) {
@@ -186,10 +197,10 @@ export class SnowflakeExecutor {
186197
);
187198
}
188199

189-
public async batch(sqlText: string): Promise<QueryData> {
200+
public async batch(sqlText: string, options?: RunSQLOptions, timeoutMs?: number): Promise<QueryData> {
190201
return await this.pool_.use(async (conn: Connection) => {
191202
await this._setSessionParams(conn);
192-
return await this._execute(sqlText, conn);
203+
return await this._execute(sqlText, conn, options, timeoutMs);
193204
});
194205
}
195206

0 commit comments

Comments
 (0)