Skip to content

Commit

Permalink
Add: location support when connecting bigquery
Browse files Browse the repository at this point in the history
  • Loading branch information
Fraggle committed Jan 31, 2025
1 parent 79a5743 commit e2e0649
Show file tree
Hide file tree
Showing 16 changed files with 695 additions and 153 deletions.
15 changes: 10 additions & 5 deletions connectors/src/connectors/bigquery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import type {
ContentNodesViewType,
Result,
} from "@dust-tt/types";
import { assertNever, Err, isBigQueryCredentials, Ok } from "@dust-tt/types";
import {
assertNever,
Err,
isBigQueryWithLocationCredentials,
Ok,
} from "@dust-tt/types";

import type { TestConnectionError } from "@connectors/connectors/bigquery/lib/bigquery_api";
import { testConnection } from "@connectors/connectors/bigquery/lib/bigquery_api";
Expand Down Expand Up @@ -68,7 +73,7 @@ export class BigQueryConnectorManager extends BaseConnectorManager<null> {
}): Promise<Result<string, ConnectorManagerError<CreateConnectorErrorCode>>> {
const credentialsRes = await getCredentials({
credentialsId: connectionId,
isTypeGuard: isBigQueryCredentials,
isTypeGuard: isBigQueryWithLocationCredentials,
logger,
});
if (credentialsRes.isErr()) {
Expand Down Expand Up @@ -125,7 +130,7 @@ export class BigQueryConnectorManager extends BaseConnectorManager<null> {

const newCredentialsRes = await getCredentials({
credentialsId: connectionId,
isTypeGuard: isBigQueryCredentials,
isTypeGuard: isBigQueryWithLocationCredentials,
logger,
});
if (newCredentialsRes.isErr()) {
Expand Down Expand Up @@ -250,7 +255,7 @@ export class BigQueryConnectorManager extends BaseConnectorManager<null> {
> {
const connectorAndCredentialsRes = await getConnectorAndCredentials({
connectorId: this.connectorId,
isTypeGuard: isBigQueryCredentials,
isTypeGuard: isBigQueryWithLocationCredentials,
logger,
});
if (connectorAndCredentialsRes.isErr()) {
Expand Down Expand Up @@ -319,7 +324,7 @@ export class BigQueryConnectorManager extends BaseConnectorManager<null> {
}): Promise<Result<void, Error>> {
const connectorAndCredentialsRes = await getConnectorAndCredentials({
connectorId: this.connectorId,
isTypeGuard: isBigQueryCredentials,
isTypeGuard: isBigQueryWithLocationCredentials,
logger,
});
if (connectorAndCredentialsRes.isErr()) {
Expand Down
29 changes: 23 additions & 6 deletions connectors/src/connectors/bigquery/lib/bigquery_api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { BigQueryCredentials, Result } from "@dust-tt/types";
import type { BigQueryCredentialsWithLocation, Result } from "@dust-tt/types";
import { Err, Ok, removeNulls } from "@dust-tt/types";
import { BigQuery } from "@google-cloud/bigquery";

Expand Down Expand Up @@ -34,7 +34,7 @@ export function isTestConnectionError(
export const testConnection = async ({
credentials,
}: {
credentials: BigQueryCredentials;
credentials: BigQueryCredentialsWithLocation;
}): Promise<Result<string, TestConnectionError>> => {
// Connect to bigquery, do a simple query.
const bigQuery = connectToBigQuery(credentials);
Expand All @@ -53,17 +53,20 @@ export const testConnection = async ({
}
};

export function connectToBigQuery(credentials: BigQueryCredentials): BigQuery {
export function connectToBigQuery(
credentials: BigQueryCredentialsWithLocation
): BigQuery {
return new BigQuery({
credentials,
scopes: ["https://www.googleapis.com/auth/bigquery.readonly"],
location: credentials.location,
});
}

export const fetchDatabases = ({
credentials,
}: {
credentials: BigQueryCredentials;
credentials: BigQueryCredentialsWithLocation;
}): RemoteDBDatabase[] => {
// BigQuery do not have a concept of databases per say, the most similar concept is a project.
// Since credentials are always scoped to a project, we directly return a single database with the project name.
Expand All @@ -79,7 +82,7 @@ export const fetchDatasets = async ({
credentials,
connection,
}: {
credentials: BigQueryCredentials;
credentials: BigQueryCredentialsWithLocation;
connection?: BigQuery;
}): Promise<Result<Array<RemoteDBSchema>, Error>> => {
const conn = connection ?? connectToBigQuery(credentials);
Expand All @@ -89,6 +92,14 @@ export const fetchDatasets = async ({
return new Ok(
removeNulls(
datasets.map((dataset) => {
// We want to filter out datasets that are not in the same location as the credentials.
// But, for example, we want to keep dataset in "us-central1" when selected location is "us"
if (
!dataset.location?.toLowerCase().startsWith(credentials.location)
) {
return null;
}

if (!dataset.id) {
return null;
}
Expand All @@ -113,7 +124,7 @@ export const fetchTables = async ({
internalDatasetId,
connection,
}: {
credentials: BigQueryCredentials;
credentials: BigQueryCredentialsWithLocation;
datasetName?: string;
internalDatasetId?: string;
connection?: BigQuery;
Expand Down Expand Up @@ -147,6 +158,12 @@ export const fetchTables = async ({
return new Ok(
removeNulls(
tables.map((table) => {
// We want to filter out tables that are not in the same location as the credentials.
// But, for example, we want to keep tables in "us-central1" when selected location is "us"
if (!table.location?.toLowerCase().startsWith(credentials.location)) {
return null;
}

if (!table.id) {
return null;
}
Expand Down
4 changes: 2 additions & 2 deletions connectors/src/connectors/bigquery/temporal/activities.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ModelId } from "@dust-tt/types";
import { isBigQueryCredentials, MIME_TYPES } from "@dust-tt/types";
import { isBigQueryWithLocationCredentials, MIME_TYPES } from "@dust-tt/types";

import {
connectToBigQuery,
Expand Down Expand Up @@ -30,7 +30,7 @@ import logger from "@connectors/logger/logger";
export async function syncBigQueryConnection(connectorId: ModelId) {
const getConnectorAndCredentialsRes = await getConnectorAndCredentials({
connectorId,
isTypeGuard: isBigQueryCredentials,
isTypeGuard: isBigQueryWithLocationCredentials,
logger,
});
if (getConnectorAndCredentialsRes.isErr()) {
Expand Down
83 changes: 2 additions & 81 deletions connectors/src/connectors/bigquery/temporal/cast_known_errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,93 +4,14 @@ import type {
Next,
} from "@temporalio/worker";

import { ExternalOAuthTokenError } from "@connectors/lib/error";

interface BigQueryError extends Error {
name: string;
data: {
nextAction: string;
};
}

interface BigQueryExpiredPasswordError extends BigQueryError {
name: "OperationFailedError";
data: {
nextAction: "PWD_CHANGE";
};
}

interface BigQueryAccountLockedError extends BigQueryError {
name: "OperationFailedError";
data: {
nextAction: "RETRY_LOGIN";
};
}

interface BigQueryIncorrectCredentialsError extends BigQueryError {
name: "OperationFailedError";
data: {
nextAction: "RETRY_LOGIN";
};
}

function isBigQueryError(err: unknown): err is BigQueryError {
return (
err instanceof Error &&
"name" in err &&
"data" in err &&
typeof err.data === "object" &&
err.data !== null &&
"nextAction" in err.data &&
typeof err.data.nextAction === "string"
);
}

function isBigQueryExpiredPasswordError(
err: unknown
): err is BigQueryExpiredPasswordError {
return isBigQueryError(err) && err.data.nextAction === "PWD_CHANGE";
}

function isBigQueryAccountLockedError(
err: unknown
): err is BigQueryAccountLockedError {
return (
isBigQueryError(err) &&
err.message.startsWith(
"Your user account has been temporarily locked due to too many failed attempts"
)
);
}

function isBigQueryIncorrectCredentialsError(
err: unknown
): err is BigQueryIncorrectCredentialsError {
return (
isBigQueryError(err) &&
err.message.startsWith("Incorrect username or password was specified")
);
}
export class BigQueryCastKnownErrorsInterceptor
implements ActivityInboundCallsInterceptor
{
async execute(
input: ActivityExecuteInput,
next: Next<ActivityInboundCallsInterceptor, "execute">
): Promise<unknown> {
try {
return await next(input);
} catch (err: unknown) {
if (
isBigQueryExpiredPasswordError(err) ||
// technically, the one below could be transient;
// we add it here to make the user aware that getting locked out of his account blocks the connection
isBigQueryAccountLockedError(err) ||
isBigQueryIncorrectCredentialsError(err)
) {
throw new ExternalOAuthTokenError(err);
}
throw err;
}
// Will add custom error handling as we discover them
return next(input);
}
}
1 change: 1 addition & 0 deletions core/src/oauth/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl Credential {
"auth_provider_x509_cert_url",
"client_x509_cert_url",
"universe_domain",
"location",
]
}
};
Expand Down
7 changes: 5 additions & 2 deletions core/src/oauth/tests/functional_credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ async fn test_oauth_credentials_bigquery_flow_ok() {
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test",
"universe_domain": "googleapis.com"
"universe_domain": "googleapis.com",
"location": "EU"
}
});

Expand Down Expand Up @@ -209,6 +210,7 @@ async fn test_oauth_credentials_bigquery_flow_ok() {
content.get("client_email").unwrap(),
"test@test-project.iam.gserviceaccount.com"
);
assert_eq!(content.get("region").unwrap(), "EU");
}

#[tokio::test]
Expand All @@ -231,7 +233,8 @@ async fn test_oauth_credentials_bigquery_delete_ok() {
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test",
"universe_domain": "googleapis.com"
"universe_domain": "googleapis.com",
"location": "US"
}
});

Expand Down
Loading

0 comments on commit e2e0649

Please sign in to comment.