Skip to content

Commit

Permalink
Cache Nango tokens in Gdrive (#2881)
Browse files Browse the repository at this point in the history
* Cache Nango token in Gdrive

* Clean up after self review

* Use cache in Gdrive
  • Loading branch information
lasryaric authored Dec 14, 2023
1 parent 21167bd commit 411510b
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 44 deletions.
17 changes: 9 additions & 8 deletions connectors/src/connectors/google_drive/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
GoogleDriveSyncToken,
GoogleDriveWebhook,
} from "@connectors/lib/models/google_drive";
import { getConnectionFromNango } from "@connectors/lib/nango_helpers";
import logger from "@connectors/logger/logger";

import { registerWebhook } from "../lib";
Expand Down Expand Up @@ -93,8 +94,7 @@ export async function getGoogleCredentials(
return await nango_client().getConnection(
NANGO_GOOGLE_DRIVE_CONNECTOR_ID,
nangoConnectionId,
false,
true
false
);
}

Expand All @@ -104,12 +104,13 @@ export async function getAuthObject(
if (!NANGO_GOOGLE_DRIVE_CONNECTOR_ID) {
throw new Error("NANGO_GOOGLE_DRIVE_CONNECTOR_ID is not defined");
}
const res: NangoGetConnectionRes = await nango_client().getConnection(
NANGO_GOOGLE_DRIVE_CONNECTOR_ID,
nangoConnectionId,
false,
true
);
const res: NangoGetConnectionRes = await getConnectionFromNango({
connectionId: nangoConnectionId,
integrationId: NANGO_GOOGLE_DRIVE_CONNECTOR_ID,
refreshToken: false,
useCache: true,
});

const oauth2Client = new google.auth.OAuth2();
oauth2Client.setCredentials({
access_token: res.credentials.access_token,
Expand Down
2 changes: 0 additions & 2 deletions connectors/src/connectors/notion/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,11 @@ export async function updateNotionConnector(
const connectionRes = await nango_client().getConnection(
NANGO_NOTION_CONNECTOR_ID,
oldConnectionId,
false,
false
);
const newConnectionRes = await nango_client().getConnection(
NANGO_NOTION_CONNECTOR_ID,
connectionId,
false,
false
);

Expand Down
1 change: 0 additions & 1 deletion connectors/src/lib/nango_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ class CustomNango extends Nango {
async getConnection(
providerConfigKey: string,
connectionId: string,
forceRefresh?: boolean,
refreshToken?: boolean
) {
try {
Expand Down
103 changes: 70 additions & 33 deletions connectors/src/lib/nango_helpers.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
import { redisClient } from "@connectors/lib/redis";
import { cacheWithRedis } from "@dust-tt/types";

import { NangoConnectionId } from "@connectors/types/nango_connection_id";

import { nango_client } from "./nango_client";

const NANGO_ACCESS_TOKEN_TTL_SECONDS = 60 * 5; // 5 minutes

async function _getAccessTokenFromNango({
connectionId,
integrationId,
}: {
connectionId: NangoConnectionId;
integrationId: string;
}) {
const accessToken = await nango_client().getToken(
integrationId,
connectionId
);
return accessToken;
}

const _cachedGetAccessTokenFromNango = cacheWithRedis(
_getAccessTokenFromNango,
({ connectionId, integrationId }) => {
return `${integrationId}-${connectionId}`;
},
NANGO_ACCESS_TOKEN_TTL_SECONDS * 1000
);

export async function getAccessTokenFromNango({
connectionId,
integrationId,
Expand All @@ -14,49 +37,63 @@ export async function getAccessTokenFromNango({
integrationId: string;
useCache?: boolean;
}) {
const cacheKey = `nango_access_token:${integrationId}/${connectionId}`;
const redis = await redisClient();

try {
const _setCache = (token: string) =>
redis.set(cacheKey, token, {
EX: NANGO_ACCESS_TOKEN_TTL_SECONDS,
});

if (!useCache) {
const accessToken = await _getAccessTokenFromNango({
connectionId,
integrationId,
});
await _setCache(accessToken);
return accessToken;
}

const maybeAccessToken = await redis.get(cacheKey);
if (maybeAccessToken) {
return maybeAccessToken;
}
const accessToken = await nango_client().getToken(
if (useCache) {
return await _cachedGetAccessTokenFromNango({
connectionId,
integrationId,
connectionId
);
await _setCache(accessToken);
return accessToken;
} finally {
await redis.quit();
});
} else {
return await _getAccessTokenFromNango({ connectionId, integrationId });
}
}

async function _getAccessTokenFromNango({
async function _getConnectionFromNango({
connectionId,
integrationId,
refreshToken,
}: {
connectionId: NangoConnectionId;
integrationId: string;
refreshToken?: boolean;
}) {
const accessToken = await nango_client().getToken(
const accessToken = await nango_client().getConnection(
integrationId,
connectionId
connectionId,
refreshToken
);
return accessToken;
}

const _getCachedConnectionFromNango = cacheWithRedis(
_getConnectionFromNango,
({ connectionId, integrationId, refreshToken }) => {
return `${integrationId}-${connectionId}-${refreshToken}`;
},
NANGO_ACCESS_TOKEN_TTL_SECONDS * 1000
);

export async function getConnectionFromNango({
connectionId,
integrationId,
refreshToken = false,
useCache = false,
}: {
connectionId: NangoConnectionId;
integrationId: string;
refreshToken?: boolean;
useCache?: boolean;
}) {
if (useCache) {
return await _getCachedConnectionFromNango({
connectionId,
integrationId,
refreshToken,
});
} else {
return await _getConnectionFromNango({
connectionId,
integrationId,
refreshToken,
});
}
}

0 comments on commit 411510b

Please sign in to comment.