From 16688da1da7cf4c4c79fd8bfd83399a7a13f8ab7 Mon Sep 17 00:00:00 2001 From: Lloyd Tabb Date: Mon, 8 Jul 2024 11:46:46 -0700 Subject: [PATCH 01/23] Initial cut at presto connector --- package-lock.json | 6 + packages/malloy-db-trino/package.json | 1 + .../src/trino_connection.spec.ts | 2 +- .../malloy-db-trino/src/trino_connection.ts | 351 ++++++++++-------- .../malloy-db-trino/src/trino_executor.ts | 53 ++- packages/malloy/src/dialect/dialect_map.ts | 3 +- packages/malloy/src/dialect/trino/trino.ts | 4 + test/src/runtimes.ts | 10 +- 8 files changed, 249 insertions(+), 181 deletions(-) diff --git a/package-lock.json b/package-lock.json index 109d8ed6f..3c749a843 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6150,6 +6150,11 @@ "dev": true, "license": "0BSD" }, + "node_modules/@prestodb/presto-js-client": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/@prestodb/presto-js-client/-/presto-js-client-1.0.0.tgz", + "integrity": "sha512-B8d0Wl8XMrtqotRTTM45GVk6bV7GER1Pbt4vUb2Ex5dUP1/WkqKMuD0LMWoO9zaiSS70DtBpvwu372BS1GjNgQ==" + }, "node_modules/@radix-ui/number": { "version": "1.0.1", "dev": true, @@ -28283,6 +28288,7 @@ "license": "MIT", "dependencies": { "@malloydata/malloy": "^0.0.130", + "@prestodb/presto-js-client": "^1.0.0", "gaxios": "^4.2.0", "trino-client": "^0.2.2" }, diff --git a/packages/malloy-db-trino/package.json b/packages/malloy-db-trino/package.json index 79ed5eb17..d1fdae841 100644 --- a/packages/malloy-db-trino/package.json +++ b/packages/malloy-db-trino/package.json @@ -23,6 +23,7 @@ }, "dependencies": { "@malloydata/malloy": "^0.0.130", + "@prestodb/presto-js-client": "^1.0.0", "gaxios": "^4.2.0", "trino-client": "^0.2.2" }, diff --git a/packages/malloy-db-trino/src/trino_connection.spec.ts b/packages/malloy-db-trino/src/trino_connection.spec.ts index af906c9e0..b5bd22ffc 100644 --- a/packages/malloy-db-trino/src/trino_connection.spec.ts +++ b/packages/malloy-db-trino/src/trino_connection.spec.ts @@ -43,7 +43,7 @@ describe('Trino connection', () => { connection = new TrinoConnection( 'trino', {}, - TrinoExecutor.getConnectionOptionsFromEnv() + TrinoExecutor.getConnectionOptionsFromEnv('trino') ); }); diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index e7fa5834c..1bb04b230 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -42,6 +42,7 @@ import { StreamingConnection, StructDef, } from '@malloydata/malloy'; +import {PrestoClient, PrestoQuery} from '@prestodb/presto-js-client'; import {randomUUID} from 'crypto'; import {Trino, BasicAuth} from 'trino-client'; @@ -57,16 +58,88 @@ export interface TrinoManagerOptions { export interface TrinoConnectionConfiguration { server?: string; + port?: number; catalog?: string; schema?: string; user?: string; password?: string; } -type TrinoConnectionOptions = ConnectionConfig; +export type TrinoConnectionOptions = ConnectionConfig; + +export interface BaseConnection { + runSQL( + sql: string, + limit: number | undefined + ): Promise<{rows: unknown[][]; columns: {name: string; type: string}[]}>; +} + +class PrestoBase implements BaseConnection { + client: PrestoClient; + constructor(config: TrinoConnectionConfiguration) { + this.client = new PrestoClient({ + catalog: config.catalog, + host: config.server, + port: config.port, + schema: config.schema, + timezone: 'America/Costa_Rica', + user: config.user || 'anyone', + }); + } + async runSQL(sql: string, limit: number | undefined) { + let ret: PrestoQuery; + const q = limit ? `SELECT * FROM(${sql}) LIMIT ${limit}` : sql; + try { + ret = (await this.client.query(q)) || []; + // console.log(ret); + } catch (error) { + // console.log(error); + throw new Error(error); + } + return { + rows: ret.data || [], + columns: ret.columns as {name: string; type: string}[], + }; + } +} + +class TrinooBase implements BaseConnection { + client: Trino; + constructor(config: TrinoConnectionConfiguration) { + this.client = Trino.create({ + catalog: config.catalog, + server: config.server, + schema: config.schema, + auth: new BasicAuth(config.user!, config.password), + }); + } + async runSQL(sql: string, limit: number | undefined) { + const result = await this.client.query(sql); + let queryResult = await result.next(); + const columns = queryResult.value.columns; + + const outputRows: unknown[][] = []; + while (queryResult !== null && (!limit || outputRows.length < limit)) { + const rows = queryResult.value.data ?? []; + for (const row of rows) { + if (!limit || outputRows.length < limit) { + outputRows.push(row as unknown[]); + } + } + if (!queryResult.done) { + queryResult = await result.next(); + } else { + break; + } + } + // console.log(outputRows); + // console.log(columns); + return {rows: outputRows, columns}; + } +} // manage access to BQ, control costs, enforce global data/API limits -export class TrinoConnection implements Connection, PersistSQLResults { +export class TrinoPrestoConnection implements Connection, PersistSQLResults { trinoToMalloyTypes: {[key: string]: FieldAtomicTypeDef} = { 'varchar': {type: 'string'}, 'integer': {type: 'number', numberType: 'integer'}, @@ -104,7 +177,7 @@ export class TrinoConnection implements Connection, PersistSQLResults { return undefined; } - public readonly name: string; + public name: string; private readonly dialect = new StandardSQLDialect(); static DEFAULT_QUERY_OPTIONS: RunSQLOptions = { rowLimit: 10, @@ -127,52 +200,28 @@ export class TrinoConnection implements Connection, PersistSQLResults { private queryOptions?: QueryOptionsReader; - private config: TrinoConnectionConfiguration; + //private config: TrinoConnectionConfiguration; - private trino: Trino; + private client: BaseConnection; - constructor( - option: TrinoConnectionOptions, - queryOptions?: QueryOptionsReader - ); constructor( name: string, queryOptions?: QueryOptionsReader, - config?: TrinoConnectionConfiguration - ); - constructor( - arg: string | TrinoConnectionOptions, - queryOptions?: QueryOptionsReader, - config: TrinoConnectionConfiguration = {} + pConfig?: TrinoConnectionConfiguration ) { - this.name = 'trino'; - /* if (typeof arg === 'string') { - this.name = arg; + const config = pConfig || {}; + this.name = name; + if (name === 'trino') { + this.client = new TrinooBase(config); } else { - const {name, client_email, private_key, ...args} = arg; - this.name = name; - config = args; - if (client_email || private_key) { - config.credentials = { - client_email, - private_key, - }; - } - }*/ - // TODO: check user is set. - this.trino = Trino.create({ - server: config.server, - catalog: config.catalog, - schema: config.schema, - auth: new BasicAuth(config.user!, config.password), - }); - + this.client = new PrestoBase(config); + } this.queryOptions = queryOptions; - this.config = config; + //this.config = config; } get dialectName(): string { - return 'trino'; + return this.name; } private readQueryOptions(): RunSQLOptions { @@ -208,51 +257,6 @@ export class TrinoConnection implements Connection, PersistSQLResults { throw new Error('not implemented 1'); } - /* private async _runSQL( - sqlCommand: string, - {rowLimit, abortSignal}: RunSQLOptions = {}, - rowIndex = 0 - ): Promise<{ - data: MalloyQueryData; - schema: Trino.ITableFieldSchema | undefined; - }> { - const defaultOptions = this.readQueryOptions(); - const pageSize = rowLimit ?? defaultOptions.rowLimit; - - try { - const queryResultsOptions: QueryResultsOptions = { - maxResults: pageSize, - startIndex: rowIndex.toString(), - }; - - const jobResult = await this.createTrinoJobAndGetResults( - sqlCommand, - undefined, - queryResultsOptions, - abortSignal - ); - - const totalRows = +(jobResult[2]?.totalRows - ? jobResult[2].totalRows - : '0'); - - // TODO even though we have 10 minute timeout limit, we still should confirm that resulting metadata has "jobComplete: true" - const queryCostBytes = jobResult[2]?.totalBytesProcessed; - const data: MalloyQueryData = { - rows: jobResult[0], - totalRows, - runStats: { - queryCostBytes: queryCostBytes ? +queryCostBytes : undefined, - }, - }; - const schema = jobResult[2]?.schema; - - return {data, schema}; - } catch (e) { - throw maybeRewriteError(e); - } - }*/ - convertRow(structDef: StructDef, _row: unknown) { const retRow = {}; const row = _row as []; @@ -295,19 +299,23 @@ export class TrinoConnection implements Connection, PersistSQLResults { // TODO(figutierrez): Use. _rowIndex = 0 ): Promise { - const result = await this.trino.query(sqlCommand); - let queryResult = await result.next(); - if (queryResult.value.error) { - // TODO: handle. - const {failureInfo: _, ...error} = queryResult.value.error; - throw new Error( - `Failed to execute sql: ${sqlCommand}. \n Error: ${JSON.stringify( - error - )}` - ); - } - - const malloyColumns = queryResult.value.columns.map(c => + // const result = await this.trino.query(sqlCommand); + // let queryResult = await result.next(); + // if (queryResult.value.error) { + // // TODO: handle. + // const {failureInfo: _, ...error} = queryResult.value.error; + // throw new Error( + // `Failed to execute sql: ${sqlCommand}. \n Error: ${JSON.stringify( + // error + // )}` + // ); + // } + + const r = await this.client.runSQL(sqlCommand, options.rowLimit); + const inputRows = r.rows; + const columns = r.columns; + + const malloyColumns = columns.map(c => this.malloyTypeFromTrinoType(c.name, c.type) ); @@ -316,58 +324,49 @@ export class TrinoConnection implements Connection, PersistSQLResults { // console.log(JSON.stringify(malloyColumns, null, 2)); // console.log(JSON.stringify(queryResult.value.data, null, 2)); - let maxRows = options.rowLimit ?? 50; const malloyRows: QueryDataRow[] = []; - while (queryResult !== null && maxRows--) { - const rows = queryResult.value.data ?? []; - for (const row of rows) { - const malloyRow: QueryDataRow = {}; - for (let i = 0; i < queryResult.value.columns.length; i++) { - const column = queryResult.value.columns[i]; - if (malloyColumns[i].type === 'struct') { - const structDef = malloyColumns[i] as StructDef; - if (structDef.structSource.type === 'inline') { - malloyRow[column.name] = this.convertRow( - structDef, - row[i] - ) as QueryValue; - } else { - malloyRow[column.name] = this.convertNest( - structDef, - row[i] - ) as QueryValue; - } - // console.log( - // column.name, - // JSON.stringify(malloyColumns[i], null, 2), - // JSON.stringify(row[i]), - // JSON.stringify(malloyRow[column.name]) - // ); - } else if ( - malloyColumns[i].type === 'number' && - typeof row[i] === 'string' - ) { - // decimal numbers come back as strings - malloyRow[column.name] = +row[i]; - } else if ( - malloyColumns[i].type === 'timestamp' && - typeof row[i] === 'string' - ) { - // timestamps come back as strings - malloyRow[column.name] = new Date(row[i]); + const rows = inputRows ?? []; + for (const row of rows) { + const malloyRow: QueryDataRow = {}; + for (let i = 0; i < columns.length; i++) { + const column = columns[i]; + if (malloyColumns[i].type === 'struct') { + const structDef = malloyColumns[i] as StructDef; + if (structDef.structSource.type === 'inline') { + malloyRow[column.name] = this.convertRow( + structDef, + row[i] + ) as QueryValue; } else { - malloyRow[column.name] = row[i] as QueryValue; + malloyRow[column.name] = this.convertNest( + structDef, + row[i] + ) as QueryValue; } + // console.log( + // column.name, + // JSON.stringify(malloyColumns[i], null, 2), + // JSON.stringify(row[i]), + // JSON.stringify(malloyRow[column.name]) + // ); + } else if ( + malloyColumns[i].type === 'number' && + typeof row[i] === 'string' + ) { + // decimal numbers come back as strings + malloyRow[column.name] = Number(row[i]); + } else if ( + malloyColumns[i].type === 'timestamp' && + typeof row[i] === 'string' + ) { + // timestamps come back as strings + malloyRow[column.name] = new Date(row[i] as string); + } else { + malloyRow[column.name] = row[i] as QueryValue; } - - malloyRows.push(malloyRow); } - if (!queryResult.done) { - queryResult = await result.next(); - } else { - break; - } + malloyRows.push(malloyRow); } // TODO(figutierrez): Remove. @@ -523,9 +522,9 @@ export class TrinoConnection implements Connection, PersistSQLResults { } private async executeAndWait(sqlBlock: string): Promise { - const result = await this.trino.query(sqlBlock); + await this.client.runSQL(sqlBlock, undefined); // TODO: make sure failure is handled correctly. - while (!(await result.next()).done); + //while (!(await result.next()).done); } splitColumns(s: string) { @@ -660,20 +659,18 @@ export class TrinoConnection implements Connection, PersistSQLResults { element: string ): Promise { try { - const result = await this.trino.query(sqlBlock); - - const queryResult = await result.next(); - - if (queryResult.value.error) { - // TODO: handle. - throw new Error( - `Failed to grab schema for ${element}: ${JSON.stringify( - queryResult.value.error - )}` - ); - } - - const rows: string[][] = queryResult.value.data ?? []; + const queryResult = await this.client.runSQL(sqlBlock, undefined); + + // if (queryResult.error) { + // // TODO: handle. + // throw new Error( + // `Failed to grab schema for ${element}: ${JSON.stringify( + // queryResult.value.error + // )}` + // ); + // } + + const rows: string[][] = (queryResult.rows as string[][]) ?? []; this.structDefFromSchema(rows, structDef); } catch (e) { throw new Error(`Could not fetch schema for ${element} ${e}`); @@ -711,3 +708,41 @@ export class TrinoConnection implements Connection, PersistSQLResults { return; } } + +export class PrestoConnection extends TrinoPrestoConnection { + constructor( + name: string, + queryOptions?: QueryOptionsReader, + config?: TrinoConnectionConfiguration + ); + constructor( + option: TrinoConnectionOptions, + queryOptions?: QueryOptionsReader + ); + constructor( + arg: string | TrinoConnectionOptions, + queryOptions?: QueryOptionsReader, + config: TrinoConnectionConfiguration = {} + ) { + super('presto', queryOptions, config); + } +} + +export class TrinoConnection extends TrinoPrestoConnection { + constructor( + name: string, + queryOptions?: QueryOptionsReader, + config?: TrinoConnectionConfiguration + ); + constructor( + option: TrinoConnectionOptions, + queryOptions?: QueryOptionsReader + ); + constructor( + arg: string | TrinoConnectionOptions, + queryOptions?: QueryOptionsReader, + config: TrinoConnectionConfiguration = {} + ) { + super('trino', queryOptions, config); + } +} diff --git a/packages/malloy-db-trino/src/trino_executor.ts b/packages/malloy-db-trino/src/trino_executor.ts index 973414004..e5edf4e04 100644 --- a/packages/malloy-db-trino/src/trino_executor.ts +++ b/packages/malloy-db-trino/src/trino_executor.ts @@ -23,33 +23,46 @@ import {TrinoConnectionConfiguration} from './trino_connection'; -export class TrinoExecutor { - public static getConnectionOptionsFromEnv(): - | TrinoConnectionConfiguration - | undefined { - const server = process.env['TRINO_SERVER']; - if (server) { - const user = process.env['TRINO_USER']; +// Differences: +// Trino uses TRINO_SERVER +// Presto users PRESTO_HOST/PRESTO_PORT +// Trino requires TRINO_USER - if (!user) { +export class TrinoExecutor { + public static getConnectionOptionsFromEnv( + dialectName: 'trino' | 'presto' + ): TrinoConnectionConfiguration | undefined { + const envPrefix = dialectName.toUpperCase(); + const user = process.env[`${envPrefix}_USER`]; + let server; + let port: number | undefined = undefined; + if (dialectName === 'trino') { + server = process.env['TRINO_SERVER']; + if (!user && server) { throw Error( 'Trino server specified but no user was provided. Set TRINO_USER and TRINO_PASSWORD environment variables' ); } + } else { + server = process.env['PRESTO_HOST']; + port = Number(process.env['PRESTO_PORT']) || 8080; + } - const password = process.env['TRINO_PASSWORD']; - // TODO(figutierrez): We may not need to support these. - const catalog = process.env['TRINO_CATALOG']; - const schema = process.env['TRINO_SCHEMA']; - return { - server, - user, - password, - catalog, - schema, - }; + if (!server) { + return undefined; } - return undefined; + const password = process.env[`${envPrefix}_PASSWORD`]; + // TODO(figutierrez): We may not need to support these. + const catalog = process.env[`${envPrefix}_CATALOG`]; + const schema = process.env[`${envPrefix}_SCHEMA`]; + return { + server, + user, + port, + password, + catalog, + schema, + }; } } diff --git a/packages/malloy/src/dialect/dialect_map.ts b/packages/malloy/src/dialect/dialect_map.ts index 8dbd5dcaa..b2ff175c6 100644 --- a/packages/malloy/src/dialect/dialect_map.ts +++ b/packages/malloy/src/dialect/dialect_map.ts @@ -26,7 +26,7 @@ import {Dialect} from './dialect'; import {PostgresDialect} from './postgres'; import {SnowflakeDialect} from './snowflake'; import {StandardSQLDialect} from './standardsql'; -import {TrinoDialect} from './trino'; +import {PrestoDialect, TrinoDialect} from './trino'; import {FunctionDef, FunctionOverloadDef} from '../model/malloy_types'; import {DialectFunctionOverloadDef} from './functions'; @@ -49,6 +49,7 @@ registerDialect(new StandardSQLDialect()); registerDialect(new DuckDBDialect()); registerDialect(new SnowflakeDialect()); registerDialect(new TrinoDialect()); +registerDialect(new PrestoDialect()); function paramsEqual( a: DialectFunctionOverloadDef, diff --git a/packages/malloy/src/dialect/trino/trino.ts b/packages/malloy/src/dialect/trino/trino.ts index 8072090f2..c8f9771ff 100644 --- a/packages/malloy/src/dialect/trino/trino.ts +++ b/packages/malloy/src/dialect/trino/trino.ts @@ -604,3 +604,7 @@ ${indent(sql)} return sqlType.match(/^[A-Za-z\s(),<>0-9]*$/) !== null; } } + +export class PrestoDialect extends TrinoDialect { + name = 'presto'; +} diff --git a/test/src/runtimes.ts b/test/src/runtimes.ts index 24478e370..a26b43764 100644 --- a/test/src/runtimes.ts +++ b/test/src/runtimes.ts @@ -37,6 +37,7 @@ import {SnowflakeConnection} from '@malloydata/db-snowflake'; import {PooledPostgresConnection} from '@malloydata/db-postgres'; import {TrinoConnection, TrinoExecutor} from '@malloydata/db-trino'; import {SnowflakeExecutor} from '@malloydata/db-snowflake/src/snowflake_executor'; +import {PrestoConnection} from '@malloydata/db-trino/src/trino_connection'; export class SnowflakeTestConnection extends SnowflakeConnection { public async runSQL( @@ -166,7 +167,14 @@ export function runtimeFor(dbName: string): SingleConnectionRuntime { connection = new TrinoConnection( dbName, {}, - TrinoExecutor.getConnectionOptionsFromEnv() + TrinoExecutor.getConnectionOptionsFromEnv(dbName) + ); + break; + case 'presto': + connection = new PrestoConnection( + dbName, + {}, + TrinoExecutor.getConnectionOptionsFromEnv(dbName) // they share configs. ); break; default: From 426c6453420fca575bdcf063dd241c8511cafc97 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Tue, 9 Jul 2024 14:33:08 -0600 Subject: [PATCH 02/23] Add abstract method and implementations to grab schema for sql block. --- .../malloy-db-trino/src/trino_connection.ts | 90 ++++++++++++++++--- 1 file changed, 77 insertions(+), 13 deletions(-) diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index 1bb04b230..b84d96c75 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -139,7 +139,7 @@ class TrinooBase implements BaseConnection { } // manage access to BQ, control costs, enforce global data/API limits -export class TrinoPrestoConnection implements Connection, PersistSQLResults { +export abstract class TrinoPrestoConnection implements Connection, PersistSQLResults { trinoToMalloyTypes: {[key: string]: FieldAtomicTypeDef} = { 'varchar': {type: 'string'}, 'integer': {type: 'number', numberType: 'integer'}, @@ -510,18 +510,14 @@ export class TrinoPrestoConnection implements Connection, PersistSQLResults { fields: [], }; - const tmpQueryName = `myMalloyQuery${randomUUID().replace(/-/g, '')}`; - await this.executeAndWait( - `PREPARE ${tmpQueryName} FROM ${sqlRef.selectStr}` - ); - return await this.loadSchemaForSqlBlock( - `DESCRIBE OUTPUT ${tmpQueryName}`, - structDef, - `query ${sqlRef.selectStr.substring(0, 50)}` - ); + this.fillStructDefForSqlBlockSchema(sqlRef.sql, structDef) + + return structDef; } - private async executeAndWait(sqlBlock: string): Promise { + protected abstract fillStructDefForSqlBlockSchema(sql: string, structDef: StructDef): Promise; + + protected async executeAndWait(sqlBlock: string): Promise { await this.client.runSQL(sqlBlock, undefined); // TODO: make sure failure is handled correctly. //while (!(await result.next()).done); @@ -556,7 +552,7 @@ export class TrinoPrestoConnection implements Connection, PersistSQLResults { return columns; } - malloyTypeFromTrinoType( + protected malloyTypeFromTrinoType( name: string, trinoType: string ): FieldAtomicTypeDef | StructDef { @@ -653,7 +649,7 @@ export class TrinoPrestoConnection implements Connection, PersistSQLResults { } } - private async loadSchemaForSqlBlock( + protected async loadSchemaForSqlBlock( sqlBlock: string, structDef: StructDef, element: string @@ -726,6 +722,62 @@ export class PrestoConnection extends TrinoPrestoConnection { ) { super('presto', queryOptions, config); } + + + protected async fillStructDefForSqlBlockSchema(sql: string, structDef: StructDef): Promise { + const explainResult = await this.runSQL(`EXPLAIN ${sql}`, {}); + this.schemaFromExplain(explainResult, structDef); + return structDef; + } + + + private schemaFromExplain(explainResult: MalloyQueryData, structDef: StructDef) { + if (explainResult.rows.length === 0) { + throw new Error('Received empty explain result when trying to fetch schema.'); + } + + const resultFirstRow = explainResult.rows[0]; + + if (resultFirstRow['Query Plan'] === undefined) { + throw new Error(`Explain result has rows but column 'Query Plan' is not present.`); + } + + const expResult = resultFirstRow['Query Plan'] as string; + + const lines = expResult.split('\n'); + if (lines?.length == 0) { + throw new Error('Received invalid explain result when trying to fetch schema.'); + } + + let outputLine = lines[0]; + + const namesIndex = outputLine.indexOf(']['); + outputLine = outputLine.substring(namesIndex + 2); + + const lineParts = outputLine.split('] => ['); + + if (lineParts.length != 2) { + throw new Error('There was a problem parsing schema from Explain.'); + } + + + const fieldNamesPart = lineParts[0]; + const fieldNames = fieldNamesPart.split(',').map(e => e.trim()); + + let schemaData = lineParts[1]; + schemaData = schemaData.substring(0, schemaData.length - 1); + const rawFieldsTarget = schemaData.split(',').map(e => e.trim()).map(e => e.split(':')); + + if (rawFieldsTarget.length != fieldNames.length) { + throw new Error('There was a problem parsing schema from Explain. Field names size do not match target fields with types.'); + } + + for (var index = 0; index < fieldNames.length; index++) { + const name = fieldNames[index]; + const type = rawFieldsTarget[index][1]; + structDef.fields.push({name, ...this.malloyTypeFromTrinoType(name, type)}); + } + } } export class TrinoConnection extends TrinoPrestoConnection { @@ -745,4 +797,16 @@ export class TrinoConnection extends TrinoPrestoConnection { ) { super('trino', queryOptions, config); } + + protected async fillStructDefForSqlBlockSchema(sql: string, structDef: StructDef): Promise { + const tmpQueryName = `myMalloyQuery${randomUUID().replace(/-/g, '')}`; + await this.executeAndWait( + `PREPARE ${tmpQueryName} FROM ${sql}` + ); + return await this.loadSchemaForSqlBlock( + `DESCRIBE OUTPUT ${tmpQueryName}`, + structDef, + `query ${sql.substring(0, 50)}` + ); + } } From 7ee5a10348782c9a0f243a3f9465c2314cfe600f Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Tue, 9 Jul 2024 14:57:34 -0600 Subject: [PATCH 03/23] Fix to compile. --- packages/malloy-db-trino/src/trino_connection.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index b84d96c75..ac0afa641 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -510,7 +510,7 @@ export abstract class TrinoPrestoConnection implements Connection, PersistSQLRes fields: [], }; - this.fillStructDefForSqlBlockSchema(sqlRef.sql, structDef) + this.fillStructDefForSqlBlockSchema(sqlRef.selectStr, structDef) return structDef; } @@ -552,7 +552,7 @@ export abstract class TrinoPrestoConnection implements Connection, PersistSQLRes return columns; } - protected malloyTypeFromTrinoType( + public malloyTypeFromTrinoType( name: string, trinoType: string ): FieldAtomicTypeDef | StructDef { @@ -727,7 +727,6 @@ export class PrestoConnection extends TrinoPrestoConnection { protected async fillStructDefForSqlBlockSchema(sql: string, structDef: StructDef): Promise { const explainResult = await this.runSQL(`EXPLAIN ${sql}`, {}); this.schemaFromExplain(explainResult, structDef); - return structDef; } @@ -803,7 +802,7 @@ export class TrinoConnection extends TrinoPrestoConnection { await this.executeAndWait( `PREPARE ${tmpQueryName} FROM ${sql}` ); - return await this.loadSchemaForSqlBlock( + await this.loadSchemaForSqlBlock( `DESCRIBE OUTPUT ${tmpQueryName}`, structDef, `query ${sql.substring(0, 50)}` From 463f2625d965d9eab820c9ebed421c027333599d Mon Sep 17 00:00:00 2001 From: Lloyd Tabb Date: Thu, 11 Jul 2024 08:47:29 -0700 Subject: [PATCH 04/23] More improvements --- .../malloy-db-trino/src/trino_connection.ts | 87 +++++++++++++------ packages/malloy/src/dialect/trino/trino.ts | 14 +-- 2 files changed, 69 insertions(+), 32 deletions(-) diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index ac0afa641..bcf3c5b99 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -139,7 +139,9 @@ class TrinooBase implements BaseConnection { } // manage access to BQ, control costs, enforce global data/API limits -export abstract class TrinoPrestoConnection implements Connection, PersistSQLResults { +export abstract class TrinoPrestoConnection + implements Connection, PersistSQLResults +{ trinoToMalloyTypes: {[key: string]: FieldAtomicTypeDef} = { 'varchar': {type: 'string'}, 'integer': {type: 'number', numberType: 'integer'}, @@ -257,9 +259,13 @@ export abstract class TrinoPrestoConnection implements Connection, PersistSQLRes throw new Error('not implemented 1'); } + unpackArray(data: unknown): unknown[] { + return data as unknown[]; + } + convertRow(structDef: StructDef, _row: unknown) { const retRow = {}; - const row = _row as []; + const row = this.unpackArray(_row); for (let i = 0; i < structDef.fields.length; i++) { const field = structDef.fields[i]; @@ -278,7 +284,8 @@ export abstract class TrinoPrestoConnection implements Connection, PersistSQLRes return retRow; } - convertNest(structDef: StructDef, data: unknown) { + convertNest(structDef: StructDef, _data: unknown) { + const data = this.unpackArray(_data); const ret: unknown[] = []; //console.log( // `${JSON.stringify(structDef, null, 2)} ${JSON.stringify(data, null, 2)} ` @@ -510,12 +517,15 @@ export abstract class TrinoPrestoConnection implements Connection, PersistSQLRes fields: [], }; - this.fillStructDefForSqlBlockSchema(sqlRef.selectStr, structDef) + this.fillStructDefForSqlBlockSchema(sqlRef.selectStr, structDef); return structDef; } - protected abstract fillStructDefForSqlBlockSchema(sql: string, structDef: StructDef): Promise; + protected abstract fillStructDefForSqlBlockSchema( + sql: string, + structDef: StructDef + ): Promise; protected async executeAndWait(sqlBlock: string): Promise { await this.client.runSQL(sqlBlock, undefined); @@ -614,7 +624,8 @@ export abstract class TrinoPrestoConnection implements Connection, PersistSQLRes parts = innerType.match(/^(.+)\s(\S+)$/); } if (parts) { - const innerName = parts[1]; + // remove quotes from the name + const innerName = parts[1].replace(/^"(.+(?="$))"$/, '$1'); const innerTrinoType = parts[2]; const innerMalloyType = this.malloyTypeFromTrinoType( innerName, @@ -723,29 +734,39 @@ export class PrestoConnection extends TrinoPrestoConnection { super('presto', queryOptions, config); } - - protected async fillStructDefForSqlBlockSchema(sql: string, structDef: StructDef): Promise { + protected async fillStructDefForSqlBlockSchema( + sql: string, + structDef: StructDef + ): Promise { const explainResult = await this.runSQL(`EXPLAIN ${sql}`, {}); this.schemaFromExplain(explainResult, structDef); } - - private schemaFromExplain(explainResult: MalloyQueryData, structDef: StructDef) { + private schemaFromExplain( + explainResult: MalloyQueryData, + structDef: StructDef + ) { if (explainResult.rows.length === 0) { - throw new Error('Received empty explain result when trying to fetch schema.'); + throw new Error( + 'Received empty explain result when trying to fetch schema.' + ); } const resultFirstRow = explainResult.rows[0]; if (resultFirstRow['Query Plan'] === undefined) { - throw new Error(`Explain result has rows but column 'Query Plan' is not present.`); + throw new Error( + "Explain result has rows but column 'Query Plan' is not present." + ); } const expResult = resultFirstRow['Query Plan'] as string; const lines = expResult.split('\n'); - if (lines?.length == 0) { - throw new Error('Received invalid explain result when trying to fetch schema.'); + if (lines?.length === 0) { + throw new Error( + 'Received invalid explain result when trying to fetch schema.' + ); } let outputLine = lines[0]; @@ -755,28 +776,39 @@ export class PrestoConnection extends TrinoPrestoConnection { const lineParts = outputLine.split('] => ['); - if (lineParts.length != 2) { - throw new Error('There was a problem parsing schema from Explain.'); + if (lineParts.length !== 2) { + throw new Error('There was a problem parsing schema from Explain.'); } - const fieldNamesPart = lineParts[0]; const fieldNames = fieldNamesPart.split(',').map(e => e.trim()); let schemaData = lineParts[1]; schemaData = schemaData.substring(0, schemaData.length - 1); - const rawFieldsTarget = schemaData.split(',').map(e => e.trim()).map(e => e.split(':')); - - if (rawFieldsTarget.length != fieldNames.length) { - throw new Error('There was a problem parsing schema from Explain. Field names size do not match target fields with types.'); + const rawFieldsTarget = schemaData + .split(',') + .map(e => e.trim()) + .map(e => e.split(':')); + + if (rawFieldsTarget.length !== fieldNames.length) { + throw new Error( + 'There was a problem parsing schema from Explain. Field names size do not match target fields with types.' + ); } - for (var index = 0; index < fieldNames.length; index++) { + for (let index = 0; index < fieldNames.length; index++) { const name = fieldNames[index]; const type = rawFieldsTarget[index][1]; - structDef.fields.push({name, ...this.malloyTypeFromTrinoType(name, type)}); + structDef.fields.push({ + name, + ...this.malloyTypeFromTrinoType(name, type), + }); } } + + unpackArray(data: unknown): unknown[] { + return JSON.parse(data as string); + } } export class TrinoConnection extends TrinoPrestoConnection { @@ -797,11 +829,12 @@ export class TrinoConnection extends TrinoPrestoConnection { super('trino', queryOptions, config); } - protected async fillStructDefForSqlBlockSchema(sql: string, structDef: StructDef): Promise { + protected async fillStructDefForSqlBlockSchema( + sql: string, + structDef: StructDef + ): Promise { const tmpQueryName = `myMalloyQuery${randomUUID().replace(/-/g, '')}`; - await this.executeAndWait( - `PREPARE ${tmpQueryName} FROM ${sql}` - ); + await this.executeAndWait(`PREPARE ${tmpQueryName} FROM ${sql}`); await this.loadSchemaForSqlBlock( `DESCRIBE OUTPUT ${tmpQueryName}`, structDef, diff --git a/packages/malloy/src/dialect/trino/trino.ts b/packages/malloy/src/dialect/trino/trino.ts index c8f9771ff..3ed0a9ab0 100644 --- a/packages/malloy/src/dialect/trino/trino.ts +++ b/packages/malloy/src/dialect/trino/trino.ts @@ -209,16 +209,16 @@ export class TrinoDialect extends Dialect { ); if (isArray) { if (needDistinctKey) { - return `LEFT JOIN UNNEST(zip(${source}, SEQUENCE(1,cardinality(${source})))) as words_0(value,__row_id_from_${alias}) ON TRUE`; + return `CROSS JOIN UNNEST(COALESCE(zip(${source}, SEQUENCE(1,cardinality(${source}))),ARRAY[null])) as words_0(value,__row_id_from_${alias})`; } else { - return `LEFT JOIN UNNEST(transform(${source}, x -> ROW(x) )) as ${alias}(value) ON TRUE`; + return `CROSS JOIN UNNEST(COALESCE(transform(${source}, x -> ROW(x) ), ARRAY[null])) as ${alias}(value)`; } } else if (needDistinctKey) { - return `LEFT JOIN UNNEST(zip_with(${source}, SEQUENCE(1,cardinality(${source})), (r,__row_id) -> (r, __row_id))) as ${alias}_outer(${alias},__row_id_from_${alias}) ON TRUE`; + return `CROSS JOIN UNNEST(COALESCE(zip_with(${source}, SEQUENCE(1,cardinality(${source})), (r,__row_id) -> (r, __row_id)),ARRAY[null])) as ${alias}_outer(${alias},__row_id_from_${alias})`; } else { - return `LEFT JOIN UNNEST(${source}) as ${alias}(${fieldsNames.join( + return `CROSS JOIN UNNEST(COALESCE(${source}, ARRAY[null])) as ${alias}(${fieldsNames.join( ', ' - )}) ON TRUE`; + )})`; } } @@ -607,4 +607,8 @@ ${indent(sql)} export class PrestoDialect extends TrinoDialect { name = 'presto'; + + sqlGenerateUUID(): string { + return 'CAST(UUID() AS VARCHAR)'; + } } From 88dfefa5f33af5e4bf742cbacbc8d82ea80d7656 Mon Sep 17 00:00:00 2001 From: Lloyd Tabb Date: Thu, 11 Jul 2024 09:22:47 -0700 Subject: [PATCH 05/23] Missing await. --- test/src/databases/all/nomodel.spec.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/src/databases/all/nomodel.spec.ts b/test/src/databases/all/nomodel.spec.ts index 36b8190c7..8e24c1495 100644 --- a/test/src/databases/all/nomodel.spec.ts +++ b/test/src/databases/all/nomodel.spec.ts @@ -548,6 +548,17 @@ runtimes.runtimeMap.forEach((runtime, databaseName) => { } ); + it(`sql block- ${databaseName}`, async () => { + await expect(` + source: one is ${databaseName}.sql(""" + SELECT 2 as ${q`a`} + """) + run: one -> { + select: a + }`).malloyResultMatches(runtime, {a: 2}); + }); + + // average should only include non-null values in the denominator it(`avg ignore null- ${databaseName}`, async () => { await expect(` @@ -628,6 +639,7 @@ runtimes.runtimeMap.forEach((runtime, databaseName) => { 'ungrouped nested with no grouping above - ${databaseName}', async () => { await expect(` + // # test.debug run: ${databaseName}.table('malloytest.state_facts') extend { measure: total_births is births.sum() measure: births_per_100k is floor(total_births/ all(total_births) * 100000) From a1fef36d6175b5b9562a106897d4f3081b0e1d7b Mon Sep 17 00:00:00 2001 From: Lloyd Tabb Date: Thu, 11 Jul 2024 10:09:49 -0700 Subject: [PATCH 06/23] more fixes. --- packages/malloy-db-trino/src/test.spec.ts | 74 +++++++++++++++++++ .../malloy-db-trino/src/trino_connection.ts | 2 +- packages/malloy/src/dialect/trino/trino.ts | 1 + test/src/databases/all/nomodel.spec.ts | 2 + 4 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 packages/malloy-db-trino/src/test.spec.ts diff --git a/packages/malloy-db-trino/src/test.spec.ts b/packages/malloy-db-trino/src/test.spec.ts new file mode 100644 index 000000000..4227f1a50 --- /dev/null +++ b/packages/malloy-db-trino/src/test.spec.ts @@ -0,0 +1,74 @@ +import {Trino, BasicAuth} from 'trino-client'; +import PrestoClient from '@prestodb/presto-js-client'; + +// function CallPresto(client: Client, query: string ) { +// return new Promise (resolve => { +// client.execute({query, data => resolve(response)}) +// }); +// } + +describe('Trino connection', () => { + console.log('hello'); + + test('says hello1', async () => { + const trino: Trino = Trino.create({ + server: 'http://localhost:8090', + catalog: 'bigquery', + schema: 'malloytest', + auth: new BasicAuth('test'), + }); + const limit = 50; + const result = await trino.query( + // 'explain SELECT 1 as one' + 'explain SELECT * FROM malloytest.ga_sample limit 2' + ); + let queryResult = await result.next(); + const columns = queryResult.value.columns; + + const outputRows: unknown[] = []; + while (queryResult !== null && outputRows.length < limit) { + const rows = queryResult.value.data ?? []; + for (const row of rows) { + if (outputRows.length < limit) { + outputRows.push(row); + } + } + if (!queryResult.done) { + queryResult = await result.next(); + } else { + break; + } + } + + const d = outputRows![0]![0]; + console.log(d); + + // console.log(outputRows); + // console.log(columns); + }); + + test('says hello presto', async () => { + const client = new PrestoClient({ + catalog: 'bigquery', + host: 'http://localhost', + port: 8080, + schema: 'malloytest', + timezone: 'America/Costa_Rica', + user: 'root', + }); + + try { + const ret = await client.query( + 'explain SELECT totals FROM malloytest.ga_sample limit 2' + // 'explain select 1 as one, 2 as two' + ); + const d = ret.data![0][0]; + + + // console.log(ret); + console.log(d); + } catch (error) { + console.log(error); + } + }); +}); diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index bcf3c5b99..72dff3c65 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -517,7 +517,7 @@ export abstract class TrinoPrestoConnection fields: [], }; - this.fillStructDefForSqlBlockSchema(sqlRef.selectStr, structDef); + await this.fillStructDefForSqlBlockSchema(sqlRef.selectStr, structDef); return structDef; } diff --git a/packages/malloy/src/dialect/trino/trino.ts b/packages/malloy/src/dialect/trino/trino.ts index 3ed0a9ab0..2f33348c7 100644 --- a/packages/malloy/src/dialect/trino/trino.ts +++ b/packages/malloy/src/dialect/trino/trino.ts @@ -607,6 +607,7 @@ ${indent(sql)} export class PrestoDialect extends TrinoDialect { name = 'presto'; + supportsPipelinesInViews = false; // what a drag... sqlGenerateUUID(): string { return 'CAST(UUID() AS VARCHAR)'; diff --git a/test/src/databases/all/nomodel.spec.ts b/test/src/databases/all/nomodel.spec.ts index 8e24c1495..04065a78b 100644 --- a/test/src/databases/all/nomodel.spec.ts +++ b/test/src/databases/all/nomodel.spec.ts @@ -52,6 +52,8 @@ function getSplitFunction(db: string) { `split(${column}, '${splitChar}')`, 'trino': (column: string, splitChar: string) => `split(${column}, '${splitChar}')`, + 'presto': (column: string, splitChar: string) => + `split(${column}, '${splitChar}')`, }[db]; } From c415bb7010ab702fe94e606800f010bdc0139c6e Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 11:30:52 -0700 Subject: [PATCH 07/23] Increase timeout for trino container. --- test/trino/trino_start.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/trino/trino_start.sh b/test/trino/trino_start.sh index 95433ba6c..38a33c106 100755 --- a/test/trino/trino_start.sh +++ b/test/trino/trino_start.sh @@ -21,7 +21,7 @@ do sleep 1 counter=$((counter+1)) # if doesn't start after 2 minutes, output logs and kill process - if [ $counter -eq 120 ] + if [ $counter -eq 300 ] then docker logs trino-malloy >& ./.tmp/trino-malloy.logs docker rm -f trino-malloy From c116786237a9a71932891c1f2c7aac203e09ff86 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 11:42:32 -0700 Subject: [PATCH 08/23] disable says hello tests. --- packages/malloy-db-trino/src/test.spec.ts | 3 ++- test/trino/trino_start.sh | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/malloy-db-trino/src/test.spec.ts b/packages/malloy-db-trino/src/test.spec.ts index 4227f1a50..eb414495f 100644 --- a/packages/malloy-db-trino/src/test.spec.ts +++ b/packages/malloy-db-trino/src/test.spec.ts @@ -1,4 +1,4 @@ -import {Trino, BasicAuth} from 'trino-client'; +/*import {Trino, BasicAuth} from 'trino-client'; import PrestoClient from '@prestodb/presto-js-client'; // function CallPresto(client: Client, query: string ) { @@ -72,3 +72,4 @@ describe('Trino connection', () => { } }); }); +*/ \ No newline at end of file diff --git a/test/trino/trino_start.sh b/test/trino/trino_start.sh index 38a33c106..2581d163e 100755 --- a/test/trino/trino_start.sh +++ b/test/trino/trino_start.sh @@ -12,7 +12,7 @@ bigquery.arrow-serialization.enabled=false EOF # run docker -docker run -p 8090:8080 -d -v ./.tmp/bigquery.properties:/etc/trino/catalog/bigquery.properties --name trino-malloy trinodb/trino +docker run -p 8090:8090 -d -v ./.tmp/bigquery.properties:/etc/trino/catalog/bigquery.properties --name trino-malloy trinodb/trino # wait for server to start counter=0 From e69a8d2a95f5a40ff09e34db99e99841c7f3028b Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 11:48:52 -0700 Subject: [PATCH 09/23] Fix empty file. --- packages/malloy-db-trino/src/test.spec.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/malloy-db-trino/src/test.spec.ts b/packages/malloy-db-trino/src/test.spec.ts index eb414495f..c9aa26a52 100644 --- a/packages/malloy-db-trino/src/test.spec.ts +++ b/packages/malloy-db-trino/src/test.spec.ts @@ -1,3 +1,4 @@ +export {}; /*import {Trino, BasicAuth} from 'trino-client'; import PrestoClient from '@prestodb/presto-js-client'; From 867a98165a8e3a27bf45ca7e0d7baa93c8c08df0 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 11:56:15 -0700 Subject: [PATCH 10/23] Rename trino temp test file. --- packages/malloy-db-trino/src/{test.spec.ts => test.nonspec.ts} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename packages/malloy-db-trino/src/{test.spec.ts => test.nonspec.ts} (100%) diff --git a/packages/malloy-db-trino/src/test.spec.ts b/packages/malloy-db-trino/src/test.nonspec.ts similarity index 100% rename from packages/malloy-db-trino/src/test.spec.ts rename to packages/malloy-db-trino/src/test.nonspec.ts From d3f2437fb2d5844e5001b8879da8a22c174f0c1a Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 17:26:36 -0700 Subject: [PATCH 11/23] Tweak log entry to try to get more info. --- packages/malloy-db-trino/src/trino_connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index 72dff3c65..bcc462852 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -680,7 +680,7 @@ export abstract class TrinoPrestoConnection const rows: string[][] = (queryResult.rows as string[][]) ?? []; this.structDefFromSchema(rows, structDef); } catch (e) { - throw new Error(`Could not fetch schema for ${element} ${e}`); + throw new Error(`Could not fetch schema for ${element} ${JSON.stringify(e as Error)} ${(e as Error).stack}`); } return structDef; From 4f1d819385bc84547fe36a7bad729cf80dd53ed6 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 17:50:19 -0700 Subject: [PATCH 12/23] Add empty password. --- .github/workflows/db-trino.yaml | 1 + packages/malloy-db-trino/src/trino_connection.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index 5869bc64b..6367f07a5 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -40,3 +40,4 @@ jobs: TRINO_SCHEMA: malloytest TRINO_SERVER: http://localhost:8090 TRINO_USER: malloy-ci-bot@malloydata.org + TRINO_PASSWORD: '' diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index bcc462852..0e8d7ad17 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -115,6 +115,7 @@ class TrinooBase implements BaseConnection { } async runSQL(sql: string, limit: number | undefined) { const result = await this.client.query(sql); + console.log(`==> SOMETHING SUCCEEDED ${sql}`); let queryResult = await result.next(); const columns = queryResult.value.columns; From d3c63396bf5cca475a386f6e1f8c0290a90bb642 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 17:59:15 -0700 Subject: [PATCH 13/23] Empty password. --- .github/workflows/db-trino.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index 6367f07a5..d0a7d9912 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -40,4 +40,4 @@ jobs: TRINO_SCHEMA: malloytest TRINO_SERVER: http://localhost:8090 TRINO_USER: malloy-ci-bot@malloydata.org - TRINO_PASSWORD: '' + TRINO_PASSWORD: "" From 807aac2acbccdf939d3a4f0ddf7a129b000a5e36 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 18:04:51 -0700 Subject: [PATCH 14/23] Push trino logs as artifacts. --- .github/workflows/db-trino.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index d0a7d9912..56dda60c3 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -41,3 +41,9 @@ jobs: TRINO_SERVER: http://localhost:8090 TRINO_USER: malloy-ci-bot@malloydata.org TRINO_PASSWORD: "" + - name: Archive production artifacts + uses: actions/upload-artifact@v4 + with: + name: trino-logs + path: | + .tmp/** From b73074a7aaaf3fb66400a1b6a40451baed2c78ea Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 18:15:23 -0700 Subject: [PATCH 15/23] Add always. --- .github/workflows/db-trino.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index 56dda60c3..c3e849511 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -42,6 +42,7 @@ jobs: TRINO_USER: malloy-ci-bot@malloydata.org TRINO_PASSWORD: "" - name: Archive production artifacts + if: always() uses: actions/upload-artifact@v4 with: name: trino-logs From 9fffe43beb4bc854f4141e1ee04e59fb3f8f4b29 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 18:19:47 -0700 Subject: [PATCH 16/23] Wait until port 8090 is ready --- test/trino/trino_start.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/trino/trino_start.sh b/test/trino/trino_start.sh index 2581d163e..d90ca2b18 100755 --- a/test/trino/trino_start.sh +++ b/test/trino/trino_start.sh @@ -31,4 +31,9 @@ do fi done +while ! nc -z localhost 8090; do + sleep 5 + echo "Waiting for Trino port to be ready" +done + echo "Trino running on port localhost:8090" \ No newline at end of file From 5b038d69063a9ed018a650cd87a582e434d49845 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 18:30:51 -0700 Subject: [PATCH 17/23] Output docker logs. --- .github/workflows/db-trino.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index c3e849511..bd05ed166 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -41,6 +41,9 @@ jobs: TRINO_SERVER: http://localhost:8090 TRINO_USER: malloy-ci-bot@malloydata.org TRINO_PASSWORD: "" + - name: grab docker logs + run: | + docker logs --since=1h trino-malloy - name: Archive production artifacts if: always() uses: actions/upload-artifact@v4 From e0c29faf2e22bc837aa0b2b072320add947c9dd3 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 18:34:33 -0700 Subject: [PATCH 18/23] Always show docker logs. --- .github/workflows/db-trino.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index bd05ed166..8f3291b8c 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -41,7 +41,8 @@ jobs: TRINO_SERVER: http://localhost:8090 TRINO_USER: malloy-ci-bot@malloydata.org TRINO_PASSWORD: "" - - name: grab docker logs + - name: show docker logs + if: always() run: | docker logs --since=1h trino-malloy - name: Archive production artifacts From 3d2ab9e9d5734cf1ba71c9702c7a16d9df39fa4b Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 18:44:06 -0700 Subject: [PATCH 19/23] Try using port 8080. --- .github/workflows/db-trino.yaml | 2 +- test/trino/trino_start.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index 8f3291b8c..e766e79b8 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -38,7 +38,7 @@ jobs: BQ_CREDENTIALS_KEY: ${{ secrets.BQ_PRESTO_TRINO_KEY }} TRINO_CATALOG: bigquery TRINO_SCHEMA: malloytest - TRINO_SERVER: http://localhost:8090 + TRINO_SERVER: http://localhost:8080 TRINO_USER: malloy-ci-bot@malloydata.org TRINO_PASSWORD: "" - name: show docker logs diff --git a/test/trino/trino_start.sh b/test/trino/trino_start.sh index d90ca2b18..95a5887b0 100755 --- a/test/trino/trino_start.sh +++ b/test/trino/trino_start.sh @@ -12,7 +12,7 @@ bigquery.arrow-serialization.enabled=false EOF # run docker -docker run -p 8090:8090 -d -v ./.tmp/bigquery.properties:/etc/trino/catalog/bigquery.properties --name trino-malloy trinodb/trino +docker run -p 8080:8080 -d -v ./.tmp/bigquery.properties:/etc/trino/catalog/bigquery.properties --name trino-malloy trinodb/trino # wait for server to start counter=0 From acb54a4c6ece9f4b0547676e6e9b3b0b72df32b9 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 18:45:59 -0700 Subject: [PATCH 20/23] Remove loop that waits for port 8080. --- test/trino/trino_start.sh | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/test/trino/trino_start.sh b/test/trino/trino_start.sh index 95a5887b0..dcac981e4 100755 --- a/test/trino/trino_start.sh +++ b/test/trino/trino_start.sh @@ -31,9 +31,4 @@ do fi done -while ! nc -z localhost 8090; do - sleep 5 - echo "Waiting for Trino port to be ready" -done - -echo "Trino running on port localhost:8090" \ No newline at end of file +echo "Trino running on port localhost:8080" \ No newline at end of file From a1468287cd8c1aa0ae7c124d40246fb307358e77 Mon Sep 17 00:00:00 2001 From: Fernando Arreola Date: Thu, 11 Jul 2024 18:56:39 -0700 Subject: [PATCH 21/23] Remove unnecesary logs. --- .github/workflows/db-trino.yaml | 2 +- packages/malloy-db-trino/src/trino_connection.ts | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index e766e79b8..ed52d1731 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -41,7 +41,7 @@ jobs: TRINO_SERVER: http://localhost:8080 TRINO_USER: malloy-ci-bot@malloydata.org TRINO_PASSWORD: "" - - name: show docker logs + - name: Show docker logs if: always() run: | docker logs --since=1h trino-malloy diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index 0e8d7ad17..f8ede1f39 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -115,7 +115,6 @@ class TrinooBase implements BaseConnection { } async runSQL(sql: string, limit: number | undefined) { const result = await this.client.query(sql); - console.log(`==> SOMETHING SUCCEEDED ${sql}`); let queryResult = await result.next(); const columns = queryResult.value.columns; @@ -681,7 +680,7 @@ export abstract class TrinoPrestoConnection const rows: string[][] = (queryResult.rows as string[][]) ?? []; this.structDefFromSchema(rows, structDef); } catch (e) { - throw new Error(`Could not fetch schema for ${element} ${JSON.stringify(e as Error)} ${(e as Error).stack}`); + throw new Error(`Could not fetch schema for ${element} ${JSON.stringify(e as Error)}`); } return structDef; From 751c043cbb8588c77da3fcdd7098f8bd65e15239 Mon Sep 17 00:00:00 2001 From: Lloyd Tabb Date: Fri, 12 Jul 2024 06:16:13 -0700 Subject: [PATCH 22/23] Make runSQL return errors instead of throwing. Make password optional in trino. --- .github/workflows/db-trino.yaml | 1 - .../malloy-db-trino/src/trino_connection.ts | 49 +++++++++++++------ .../malloy-db-trino/src/trino_executor.ts | 3 +- 3 files changed, 35 insertions(+), 18 deletions(-) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index ed52d1731..6d8015279 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -40,7 +40,6 @@ jobs: TRINO_SCHEMA: malloytest TRINO_SERVER: http://localhost:8080 TRINO_USER: malloy-ci-bot@malloydata.org - TRINO_PASSWORD: "" - name: Show docker logs if: always() run: | diff --git a/packages/malloy-db-trino/src/trino_connection.ts b/packages/malloy-db-trino/src/trino_connection.ts index f8ede1f39..bc323b32f 100644 --- a/packages/malloy-db-trino/src/trino_connection.ts +++ b/packages/malloy-db-trino/src/trino_connection.ts @@ -71,7 +71,11 @@ export interface BaseConnection { runSQL( sql: string, limit: number | undefined - ): Promise<{rows: unknown[][]; columns: {name: string; type: string}[]}>; + ): Promise<{ + rows: unknown[][]; + columns: {name: string; type: string; error?: string}[]; + error?: string; + }>; } class PrestoBase implements BaseConnection { @@ -87,18 +91,23 @@ class PrestoBase implements BaseConnection { }); } async runSQL(sql: string, limit: number | undefined) { - let ret: PrestoQuery; + let ret: PrestoQuery | undefined = undefined; const q = limit ? `SELECT * FROM(${sql}) LIMIT ${limit}` : sql; + let error: string | undefined = undefined; try { ret = (await this.client.query(q)) || []; // console.log(ret); - } catch (error) { + } catch (errorObj) { // console.log(error); - throw new Error(error); + error = errorObj.toString(); } return { - rows: ret.data || [], - columns: ret.columns as {name: string; type: string}[], + rows: ret && ret.data ? ret.data : [], + columns: + ret && ret.columns + ? (ret.columns as {name: string; type: string}[]) + : [], + error, }; } } @@ -110,12 +119,19 @@ class TrinooBase implements BaseConnection { catalog: config.catalog, server: config.server, schema: config.schema, - auth: new BasicAuth(config.user!, config.password), + auth: new BasicAuth(config.user!, config.password || ''), }); } async runSQL(sql: string, limit: number | undefined) { const result = await this.client.query(sql); let queryResult = await result.next(); + if (queryResult.value.error) { + return { + rows: [], + columns: [], + error: JSON.stringify(queryResult.value.error), + }; + } const columns = queryResult.value.columns; const outputRows: unknown[][] = []; @@ -668,19 +684,20 @@ export abstract class TrinoPrestoConnection try { const queryResult = await this.client.runSQL(sqlBlock, undefined); - // if (queryResult.error) { - // // TODO: handle. - // throw new Error( - // `Failed to grab schema for ${element}: ${JSON.stringify( - // queryResult.value.error - // )}` - // ); - // } + if (queryResult.error) { + // TODO: handle. + throw new Error( + `Failed to grab schema for ${queryResult.error} + )}` + ); + } const rows: string[][] = (queryResult.rows as string[][]) ?? []; this.structDefFromSchema(rows, structDef); } catch (e) { - throw new Error(`Could not fetch schema for ${element} ${JSON.stringify(e as Error)}`); + throw new Error( + `Could not fetch schema for ${element} ${JSON.stringify(e as Error)}` + ); } return structDef; diff --git a/packages/malloy-db-trino/src/trino_executor.ts b/packages/malloy-db-trino/src/trino_executor.ts index e5edf4e04..eddc06f32 100644 --- a/packages/malloy-db-trino/src/trino_executor.ts +++ b/packages/malloy-db-trino/src/trino_executor.ts @@ -56,7 +56,7 @@ export class TrinoExecutor { // TODO(figutierrez): We may not need to support these. const catalog = process.env[`${envPrefix}_CATALOG`]; const schema = process.env[`${envPrefix}_SCHEMA`]; - return { + const ret = { server, user, port, @@ -64,5 +64,6 @@ export class TrinoExecutor { catalog, schema, }; + return ret; } } From d577de57545de9b4f945028bb82dffc37d58a7d2 Mon Sep 17 00:00:00 2001 From: Lloyd Tabb Date: Fri, 12 Jul 2024 06:35:16 -0700 Subject: [PATCH 23/23] remove the show logs... --- .github/workflows/db-trino.yaml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/.github/workflows/db-trino.yaml b/.github/workflows/db-trino.yaml index 6d8015279..a4dce471b 100644 --- a/.github/workflows/db-trino.yaml +++ b/.github/workflows/db-trino.yaml @@ -40,14 +40,14 @@ jobs: TRINO_SCHEMA: malloytest TRINO_SERVER: http://localhost:8080 TRINO_USER: malloy-ci-bot@malloydata.org - - name: Show docker logs - if: always() - run: | - docker logs --since=1h trino-malloy - - name: Archive production artifacts - if: always() - uses: actions/upload-artifact@v4 - with: - name: trino-logs - path: | - .tmp/** + # - name: Show docker logs + # if: always() + # run: | + # docker logs --since=1h trino-malloy + # - name: Archive production artifacts + # if: always() + # uses: actions/upload-artifact@v4 + # with: + # name: trino-logs + # path: | + # .tmp/**