Skip to content

Commit

Permalink
Merge pull request #58 from outerbase/invisal/rework-query-build
Browse files Browse the repository at this point in the history
BigQuery primary key support, binding NULL value and auto casting
  • Loading branch information
invisal authored Oct 28, 2024
2 parents 2ca1b70 + 2c4e2b3 commit 6d03a81
Show file tree
Hide file tree
Showing 9 changed files with 492 additions and 230 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@outerbase/sdk",
"version": "2.0.0-rc.0",
"version": "2.0.0-rc.1",
"description": "",
"main": "dist/index.js",
"module": "dist/index.js",
Expand Down
333 changes: 253 additions & 80 deletions src/connections/bigquery.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { QueryType } from '../query-params';
import { Query } from '../query';
import { QueryResult } from './index';
import { Database, Table, TableColumn } from '../models/database';
import { ConnectionSelectOptions, QueryResult } from './index';
import { Database, Schema, Table, TableColumn } from '../models/database';
import { BigQueryDialect } from '../query-builder/dialects/bigquery';
import { BigQuery } from '@google-cloud/bigquery';
import {
Expand All @@ -10,49 +10,188 @@ import {
} from './../utils/transformer';
import { SqlConnection } from './sql-base';

const NUMERIC_TYPE = [
'INT64',
'FLOAT64',
'INTEGER',
'FLOAT',
'NUMERIC',
'BIGNUMERIC',
];
export class BigQueryConnection extends SqlConnection {
bigQuery: BigQuery;

// Default query type to positional for BigQuery
queryType = QueryType.positional;

// Default dialect for BigQuery
dialect = new BigQueryDialect();
cacheFields: Record<string, Record<string, string>> = {};

/**
* Creates a new BigQuery object.
*
* @param keyFileName - Path to a .json, .pem, or .p12 key file.
* @param region - Region for your dataset
*/
constructor(bigQuery: any) {
super();
this.bigQuery = bigQuery;
}

/**
* Performs a connect action on the current Connection object.
* In this particular use case, BigQuery has no connect
* So this is a no-op
*
* @param details - Unused in the BigQuery scenario.
* @returns Promise<any>
*/
async connect(): Promise<any> {
return Promise.resolve();
}

/**
* Performs a disconnect action on the current Connection object.
* In this particular use case, BigQuery has no disconnect
* So this is a no-op
*
* @returns Promise<any>
*/
async disconnect(): Promise<any> {
return Promise.resolve();
}

createTable(
schemaName: string | undefined,
tableName: string,
columns: TableColumn[]
): Promise<QueryResult> {
// BigQuery does not support PRIMARY KEY. We can remove if here
const tempColumns = structuredClone(columns);
for (const column of tempColumns) {
delete column.definition.references;
}

return super.createTable(schemaName, tableName, tempColumns);
}

async getFields(
schemaName: string,
tableName: string
): Promise<Record<string, string>> {
if (this.cacheFields[schemaName]) return this.cacheFields[schemaName];

if (!schemaName)
throw new Error('Schema name is required for BigQuery');

const [metadata] = await this.bigQuery
.dataset(schemaName)
.table(tableName)
.getMetadata();

const fields: { name: string; type: string }[] = metadata.schema.fields;
const fieldsType: Record<string, string> = fields.reduce(
(acc, field) => {
acc[field.name] = field.type;
return acc;
},
{} as Record<string, string>
);

this.cacheFields[schemaName] = fieldsType;
return fieldsType;
}

transformTypedValue(type: string, value: unknown) {
if (value === null) return value;

if (NUMERIC_TYPE.includes(type)) {
return Number(value);
}

return value;
}

async autoCastingType(
schemaName: string | undefined,
tableName: string,
data: Record<string, unknown>
): Promise<Record<string, unknown>> {
const tmp = structuredClone(data);

if (!schemaName)
throw new Error('Schema name is required for BigQuery');

const fieldsType: Record<string, string> = await this.getFields(
schemaName,
tableName
);

for (const key in tmp) {
const type = fieldsType[key];
if (!type) continue;
tmp[key] = this.transformTypedValue(type, tmp[key]);
}

return tmp;
}

async insert(
schemaName: string | undefined,
tableName: string,
data: Record<string, unknown>
): Promise<QueryResult> {
return super.insert(
schemaName,
tableName,
await this.autoCastingType(schemaName, tableName, data)
);
}

async insertMany(
schemaName: string | undefined,
tableName: string,
data: Record<string, unknown>[]
): Promise<QueryResult> {
const newData: Record<string, unknown>[] = [];

for (const item of data) {
newData.push(
await this.autoCastingType(schemaName, tableName, item)
);
}

return super.insertMany(schemaName, tableName, newData);
}

async update(
schemaName: string | undefined,
tableName: string,
data: Record<string, unknown>,
where: Record<string, unknown>
): Promise<QueryResult> {
return super.update(
schemaName,
tableName,
await this.autoCastingType(schemaName, tableName, data),
await this.autoCastingType(schemaName, tableName, where)
);
}

async delete(
schemaName: string,
tableName: string,
where: Record<string, unknown>
): Promise<QueryResult> {
return super.delete(
schemaName,
tableName,
await this.autoCastingType(schemaName, tableName, where)
);
}

async select(
schemaName: string,
tableName: string,
options: ConnectionSelectOptions
): Promise<QueryResult> {
// Auto casting the where
let where = options.where;

if (where && where.length > 0) {
const fields = await this.getFields(schemaName, tableName);
where = where.map((t) => {
const type = fields[t.name];
if (!type) return t;

return {
...t,
value: this.transformTypedValue(type, t.value),
};
});
}

return super.select(schemaName, tableName, {
...options,
where,
});
}

/**
* Triggers a query action on the current Connection object.
*
Expand Down Expand Up @@ -86,70 +225,104 @@ export class BigQueryConnection extends SqlConnection {
}
}

createTable(
schemaName: string | undefined,
tableName: string,
columns: TableColumn[]
): Promise<QueryResult> {
// BigQuery does not support PRIMARY KEY. We can remove if here
const tempColumns = structuredClone(columns);
for (const column of tempColumns) {
delete column.definition.primaryKey;
delete column.definition.references;
}
public async fetchDatabaseSchema(): Promise<Database> {
const [datasetList] = await this.bigQuery.getDatasets();

return super.createTable(schemaName, tableName, tempColumns);
}
// Construct the query to get all the table in one go
const sql = datasetList
.map((dataset) => {
const schemaPath = `${this.bigQuery.projectId}.${dataset.id}`;

public async fetchDatabaseSchema(): Promise<Database> {
const database: Database = {};
return `(
SELECT
a.table_schema,
a.table_name,
a.column_name,
a.data_type,
b.constraint_schema,
b.constraint_name,
c.constraint_type
FROM \`${schemaPath}.INFORMATION_SCHEMA.COLUMNS\` AS a LEFT JOIN \`${schemaPath}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE\` AS b ON (
a.table_schema = b.table_schema AND
a.table_name = b.table_name AND
a.column_name = b.column_name
) LEFT JOIN \`${schemaPath}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS\` AS c ON (
b.constraint_schema = c.constraint_schema AND
b.constraint_name = c.constraint_name
)
)`;
})
.join(' UNION ALL ');

// Fetch all datasets
const [datasets] = await this.bigQuery.getDatasets();
if (datasets.length === 0) {
throw new Error('No datasets found in the project.');
}
const { data } = await this.query<{
table_schema: string;
table_name: string;
column_name: string;
data_type: string;
constraint_schema: string;
constraint_name: string;
constraint_type: null | 'PRIMARY KEY' | 'FOREIGN KEY';
}>({ query: sql });

// Group the database schema by table
const database: Database = datasetList.reduce(
(acc, dataset) => {
acc[dataset.id ?? ''] = {};
return acc;
},
{} as Record<string, Schema>
);

// Iterate over each dataset
for (const dataset of datasets) {
const datasetId = dataset.id;
if (!datasetId) continue;
// Group the table by database
data.forEach((row) => {
const schema = database[row.table_schema];
if (!schema) {
return;
}

const [tables] = await dataset.getTables();
const table = schema[row.table_name] ?? {
name: row.table_name,
columns: [],
indexes: [],
constraints: [],
};

if (!database[datasetId]) {
database[datasetId] = {}; // Initialize schema in the database
if (!schema[row.table_name]) {
schema[row.table_name] = table;
}

for (const table of tables) {
const [metadata] = await table.getMetadata();

const columns = metadata.schema.fields.map(
(field: any, index: number): TableColumn => {
return {
name: field.name,
position: index,
definition: {
type: field.type,
nullable: field.mode === 'NULLABLE',
default: null, // BigQuery does not support default values in the schema metadata
primaryKey: false, // BigQuery does not have a concept of primary keys
unique: false, // BigQuery does not have a concept of unique constraints
},
};
}
// Add the column to the table
table.columns.push({
name: row.column_name,
definition: {
type: row.data_type,
primaryKey: row.constraint_type === 'PRIMARY KEY',
},
});

// Add the constraint to the table
if (row.constraint_name && row.constraint_type === 'PRIMARY KEY') {
let constraint = table.constraints.find(
(c) => c.name === row.constraint_name
);

const currentTable: Table = {
name: table.id ?? '',
columns: columns,
indexes: [], // BigQuery does not support indexes
constraints: [], // BigQuery does not support primary keys, foreign keys, or unique constraints
};
if (!constraint) {
constraint = {
name: row.constraint_name,
schema: row.constraint_schema,
tableName: row.table_name,
type: row.constraint_type,
columns: [],
};

table.constraints.push(constraint);
}

database[datasetId][table.id ?? ''] = currentTable;
constraint.columns.push({
columnName: row.column_name,
});
}
}
});

return database;
}
Expand Down
Loading

0 comments on commit 6d03a81

Please sign in to comment.