Skip to content

Commit

Permalink
fixing bigquery cannot update with null and detect primary key
Browse files Browse the repository at this point in the history
  • Loading branch information
invisal committed Oct 28, 2024
1 parent b4ea6f2 commit be44742
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 93 deletions.
188 changes: 105 additions & 83 deletions src/connections/bigquery.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { QueryType } from '../query-params';
import { Query } from '../query';
import { QueryResult } from './index';
import { Database, Table, TableColumn } from '../models/database';
import { Database, Schema, Table, TableColumn } from '../models/database';
import { BigQueryDialect } from '../query-builder/dialects/bigquery';
import { BigQuery } from '@google-cloud/bigquery';
import {
Expand All @@ -12,47 +12,35 @@ import { SqlConnection } from './sql-base';

export class BigQueryConnection extends SqlConnection {
bigQuery: BigQuery;

// Default query type to positional for BigQuery
queryType = QueryType.positional;

// Default dialect for BigQuery
dialect = new BigQueryDialect();

/**
* Creates a new BigQuery object.
*
* @param keyFileName - Path to a .json, .pem, or .p12 key file.
* @param region - Region for your dataset
*/
constructor(bigQuery: any) {
super();
this.bigQuery = bigQuery;
}

/**
* Performs a connect action on the current Connection object.
* In this particular use case, BigQuery has no connect
* So this is a no-op
*
* @param details - Unused in the BigQuery scenario.
* @returns Promise<any>
*/
async connect(): Promise<any> {
return Promise.resolve();
}

/**
* Performs a disconnect action on the current Connection object.
* In this particular use case, BigQuery has no disconnect
* So this is a no-op
*
* @returns Promise<any>
*/
async disconnect(): Promise<any> {
return Promise.resolve();
}

createTable(
schemaName: string | undefined,
tableName: string,
columns: TableColumn[]
): Promise<QueryResult> {
// BigQuery does not support PRIMARY KEY. We can remove if here
const tempColumns = structuredClone(columns);
for (const column of tempColumns) {
delete column.definition.references;
}

return super.createTable(schemaName, tableName, tempColumns);
}

/**
* Triggers a query action on the current Connection object.
*
Expand Down Expand Up @@ -86,70 +74,104 @@ export class BigQueryConnection extends SqlConnection {
}
}

createTable(
schemaName: string | undefined,
tableName: string,
columns: TableColumn[]
): Promise<QueryResult> {
// BigQuery does not support PRIMARY KEY. We can remove if here
const tempColumns = structuredClone(columns);
for (const column of tempColumns) {
delete column.definition.primaryKey;
delete column.definition.references;
}

return super.createTable(schemaName, tableName, tempColumns);
}

public async fetchDatabaseSchema(): Promise<Database> {
const database: Database = {};

// Fetch all datasets
const [datasets] = await this.bigQuery.getDatasets();
if (datasets.length === 0) {
throw new Error('No datasets found in the project.');
}

// Iterate over each dataset
for (const dataset of datasets) {
const datasetId = dataset.id;
if (!datasetId) continue;
const [datasetList] = await this.bigQuery.getDatasets();

// Construct the query to get all the table in one go
const sql = datasetList
.map((dataset) => {
const schemaPath = `${this.bigQuery.projectId}.${dataset.id}`;

return `(
SELECT
a.table_schema,
a.table_name,
a.column_name,
a.data_type,
b.constraint_schema,
b.constraint_name,
c.constraint_type
FROM \`${schemaPath}.INFORMATION_SCHEMA.COLUMNS\` AS a LEFT JOIN \`${schemaPath}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE\` AS b ON (
a.table_schema = b.table_schema AND
a.table_name = b.table_name AND
a.column_name = b.column_name
) LEFT JOIN \`${schemaPath}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS\` AS c ON (
b.constraint_schema = c.constraint_schema AND
b.constraint_name = c.constraint_name
)
)`;
})
.join(' UNION ALL ');

const { data } = await this.query<{
table_schema: string;
table_name: string;
column_name: string;
data_type: string;
constraint_schema: string;
constraint_name: string;
constraint_type: null | 'PRIMARY KEY' | 'FOREIGN KEY';
}>({ query: sql });

// Group the database schema by table
const database: Database = datasetList.reduce(
(acc, dataset) => {
acc[dataset.id ?? ''] = {};
return acc;
},
{} as Record<string, Schema>
);

// Group the table by database
data.forEach((row) => {
const schema = database[row.table_schema];
if (!schema) {
return;
}

const [tables] = await dataset.getTables();
const table = schema[row.table_name] ?? {
name: row.table_name,
columns: [],
indexes: [],
constraints: [],
};

if (!database[datasetId]) {
database[datasetId] = {}; // Initialize schema in the database
if (!schema[row.table_name]) {
schema[row.table_name] = table;
}

for (const table of tables) {
const [metadata] = await table.getMetadata();

const columns = metadata.schema.fields.map(
(field: any, index: number): TableColumn => {
return {
name: field.name,
position: index,
definition: {
type: field.type,
nullable: field.mode === 'NULLABLE',
default: null, // BigQuery does not support default values in the schema metadata
primaryKey: false, // BigQuery does not have a concept of primary keys
unique: false, // BigQuery does not have a concept of unique constraints
},
};
}
// Add the column to the table
table.columns.push({
name: row.column_name,
definition: {
type: row.data_type,
primaryKey: row.constraint_type === 'PRIMARY KEY',
},
});

// Add the constraint to the table
if (row.constraint_name && row.constraint_type === 'PRIMARY KEY') {
let constraint = table.constraints.find(
(c) => c.name === row.constraint_name
);

const currentTable: Table = {
name: table.id ?? '',
columns: columns,
indexes: [], // BigQuery does not support indexes
constraints: [], // BigQuery does not support primary keys, foreign keys, or unique constraints
};

database[datasetId][table.id ?? ''] = currentTable;
if (!constraint) {
constraint = {
name: row.constraint_name,
schema: row.constraint_schema,
tableName: row.table_name,
type: row.constraint_type,
columns: [],
};

table.constraints.push(constraint);
}

constraint.columns.push({
columnName: row.column_name,
});
}
}
});

return database;
}
Expand Down
2 changes: 2 additions & 0 deletions src/query-builder/dialects/bigquery.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { MySQLDialect } from './mysql';
export class BigQueryDialect extends MySQLDialect {
protected ALWAY_NO_ENFORCED_CONSTRAINT = true;

escapeId(identifier: string): string {
return `\`${identifier}\``;
}
Expand Down
36 changes: 33 additions & 3 deletions src/query-builder/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ export abstract class AbstractDialect implements Dialect {
protected AUTO_INCREMENT_KEYWORD = 'AUTO_INCREMENT';
protected SUPPORT_COLUMN_COMMENT = true;

// BigQuery does not support enforced constraint
// This flag is primary for BigQuery only.
protected ALWAY_NO_ENFORCED_CONSTRAINT = false;

escapeId(identifier: string): string {
return identifier
.split('.')
Expand Down Expand Up @@ -134,6 +138,15 @@ export abstract class AbstractDialect implements Dialect {
}
return merged;
} else {
// BigQuery does not provide easy way to bind NULL value,
// so we will skip binding NULL values and use raw NULL in query
if (where.value === null) {
return [
`${this.escapeId(where.column)} ${where.operator} NULL`,
[],
];
}

return [
`${this.escapeId(where.column)} ${where.operator} ?`,
[where.value],
Expand Down Expand Up @@ -177,6 +190,10 @@ export abstract class AbstractDialect implements Dialect {
const bindings: unknown[] = [];

const setClauses = columns.map((column) => {
// BigQuery does not provide easy way to bind NULL value,
// so we will skip binding NULL values and use raw NULL in query
if (data[column] === null) return `${this.escapeId(column)} = NULL`;

bindings.push(data[column]);
return `${this.escapeId(column)} = ?`;
});
Expand All @@ -199,11 +216,20 @@ export abstract class AbstractDialect implements Dialect {
const bindings: unknown[] = [];

const columnNames = columns.map((column) => {
bindings.push(data[column]);
// BigQuery does not provide easy way to bind NULL value,
// so we will skip binding NULL values and use raw NULL in query
if (data[column] !== null) bindings.push(data[column]);
return this.escapeId(column);
});

const placeholders = columns.map(() => '?').join(', ');
const placeholders = columns
.map((column) => {
// BigQuery does not provide easy way to bind NULL value,
// so we will skip binding NULL values and use raw NULL in query
if (data[column] === null) return 'NULL';
return '?';
})
.join(', ');

return [
`(${columnNames.join(', ')}) VALUES(${placeholders})`,
Expand All @@ -218,6 +244,9 @@ export abstract class AbstractDialect implements Dialect {
def.nullable === false ? 'NOT NULL' : '',
def.invisible ? 'INVISIBLE' : '', // This is for MySQL case
def.primaryKey ? 'PRIMARY KEY' : '',
def.primaryKey && this.ALWAY_NO_ENFORCED_CONSTRAINT
? 'NOT ENFORCED'
: '',
def.unique ? 'UNIQUE' : '',
def.default ? `DEFAULT ${this.escapeValue(def.default)}` : '',
def.defaultExpression ? `DEFAULT (${def.defaultExpression})` : '',
Expand Down Expand Up @@ -278,7 +307,7 @@ export abstract class AbstractDialect implements Dialect {
const tableName = builder.table;

if (!tableName) {
throw new Error('Table name is required to build a UPDATE query.');
throw new Error('Table name is required to build a INSERT query.');
}

// Remove all empty value from object and check if there is any data to update
Expand Down Expand Up @@ -369,6 +398,7 @@ export abstract class AbstractDialect implements Dialect {
ref.match ? `MATCH ${ref.match}` : '',
ref.onDelete ? `ON DELETE ${ref.onDelete}` : '',
ref.onUpdate ? `ON UPDATE ${ref.onUpdate}` : '',
this.ALWAY_NO_ENFORCED_CONSTRAINT ? 'NOT ENFORCED' : '',
]
.filter(Boolean)
.join(' ');
Expand Down
29 changes: 27 additions & 2 deletions tests/connections/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ describe('Database Connection', () => {
expect(fkConstraint!.referenceTableName).toBe('teams');
expect(fkConstraint!.columns[0].referenceColumnName).toBe('id');
}

// Check the primary key
if (process.env.CONNECTION_TYPE !== 'mongodb') {
const pkList = Object.values(schemas[DEFAULT_SCHEMA])
.map((c) => c.constraints)
.flat()
.filter((c) => c.type === 'PRIMARY KEY')
.map((constraint) =>
constraint.columns.map(
(column) =>
`${constraint.tableName}.${column.columnName}`
)
)
.flat()
.sort();

expect(pkList).toEqual(['persons.id', 'teams.id']);
}
});

test('Select data', async () => {
Expand Down Expand Up @@ -361,6 +379,10 @@ describe('Database Connection', () => {
});

test('Rename table name', async () => {
// Skip BigQuery because you cannot rename table with
// primary key column
if (process.env.CONNECTION_TYPE === 'bigquery') return;

const { error } = await db.renameTable(
DEFAULT_SCHEMA,
'persons',
Expand All @@ -374,12 +396,15 @@ describe('Database Connection', () => {
});

expect(cleanup(data).length).toEqual(2);

// Revert the operation back
await db.renameTable(DEFAULT_SCHEMA, 'people', 'persons');
});

test('Delete a row', async () => {
await db.delete(DEFAULT_SCHEMA, 'people', { id: 1 });
await db.delete(DEFAULT_SCHEMA, 'persons', { id: 1 });

const { data } = await db.select(DEFAULT_SCHEMA, 'people', {
const { data } = await db.select(DEFAULT_SCHEMA, 'persons', {
orderBy: ['id'],
});

Expand Down
Loading

0 comments on commit be44742

Please sign in to comment.