Skip to content

Commit

Permalink
fix: reduce the number of requests by batching
Browse files Browse the repository at this point in the history
  • Loading branch information
sshivaditya committed Jan 4, 2025
1 parent ed3ad2e commit b53a607
Showing 1 changed file with 156 additions and 132 deletions.
288 changes: 156 additions & 132 deletions functions/issue-scraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,43 @@ import markdownit from "markdown-it";
import plainTextPlugin from "markdown-it-plain-text";
import { validatePOST } from "./validators";

const VECTOR_SIZE = 1024;

interface MarkdownItWithPlainText extends markdownit {
plainText: string;
}

interface IssueMetadata {
nodeId: string;
number: number;
title: string;
body: string;
state: string;
repositoryName: string;
repositoryId: number;
assignees: string[];
authorId: number;
createdAt: string;
closedAt: string | null;
stateReason: string | null;
updatedAt: string;
interface PayloadType {
issue: {
nodeId: string;
number: number;
title: string;
body: string;
state: string;
stateReason: string | null;
repositoryName: string;
repositoryId: number;
assignees: string[];
createdAt: string;
closedAt: string | null;
updatedAt: string;
};
action: string;
sender: {
login: string;
};
repository: {
id: number;
node_id: string;
name: string;
full_name: string;
owner: {
login: string;
id: number;
type: string;
site_admin: boolean;
};
};
}

interface IssueNode {
Expand Down Expand Up @@ -84,7 +103,12 @@ export async function onRequest(ctx: Context): Promise<Response> {
const githubUserName = result.gitHubUser.login;
try {
const supabase = new SupabaseClient(env.SUPABASE_URL, env.SUPABASE_KEY);
const response = await issueScraper(githubUserName, supabase, env.VOYAGEAI_API_KEY, result.authToken);
const response = await issueScraper(
githubUserName,
supabase,
env.VOYAGEAI_API_KEY,
result.authToken
);
return new Response(response, {
headers: corsHeaders,
status: 200,
Expand Down Expand Up @@ -165,27 +189,18 @@ const SEARCH_ISSUES_QUERY = `
}
`;

async function fetchAuthorId(octokit: InstanceType<typeof Octokit>, login: string): Promise<number> {
try {
const response = await octokit.rest.users.getByUsername({ username: login });
return response.data.id;
} catch (error) {
console.error(`Error fetching author ID for ${login}:`, error);
return -1;
}
}

async function fetchUserIssues(octokit: InstanceType<typeof Octokit>, username: string): Promise<IssueNode[]> {
async function fetchUserIssuesBatch(
octokit: InstanceType<typeof Octokit>,
username: string
): Promise<IssueNode[]> {
const allIssues: IssueNode[] = [];
let hasNextPage = true;
let cursor: string | null = null;

const searchText = `assignee:${username} is:issue is:closed`;

while (hasNextPage) {
const variables: { searchText: string; after?: string } = {
searchText,
};
const variables: { searchText: string; after?: string } = { searchText };
if (cursor) {
variables.after = cursor;
}
Expand All @@ -197,103 +212,106 @@ async function fetchUserIssues(octokit: InstanceType<typeof Octokit>, username:

hasNextPage = response.search.pageInfo.hasNextPage;
cursor = response.search.pageInfo.endCursor;

if (!cursor) break;
}

return allIssues;
}

// Pulls issues from GitHub and stores them in Supabase
async function issueScraper(username: string, supabase: SupabaseClient, voyageApiKey: string, token?: string): Promise<string> {
async function batchEmbeddings(voyageClient: VoyageAIClient, texts: string[]): Promise<(number[] | undefined)[]> {
try {
const embeddingResponse = await voyageClient.embed({
input: texts,
model: "voyage-large-2-instruct",
inputType: "document",
});
return embeddingResponse.data?.map((item) => item.embedding) || []
} catch (error) {
console.error("Error batching embeddings:", error);
throw error;
}
}

async function batchUpsertIssues(
supabase: SupabaseClient,
issues: Array<{
id: string;
markdown: string;
plaintext: string;
embedding: string;
author_id: number;
payload: PayloadType;
}>
): Promise<void> {
const { error } = await supabase.from("issues").upsert(issues);
if (error) {
throw new Error(`Error during batch upsert: ${error.message}`);
}
}

async function batchFetchAuthorIds(
octokit: InstanceType<typeof Octokit>,
logins: string[]
): Promise<Record<string, number>> {
const authorIdMap: Record<string, number> = {};
const BATCH_SIZE = 20;
for (let i = 0; i < logins.length; i += BATCH_SIZE) {
const batch = logins.slice(i, i + BATCH_SIZE);
const promises = batch.map(async (login) => {
try {
const response = await octokit.rest.users.getByUsername({ username: login });
return { login, id: response.data.id };
} catch (error) {
console.error(`Error fetching author ID for ${login}:`, error);
return { login, id: -1 };
}
});
const results = await Promise.all(promises);
results.forEach(({ login, id }) => {
authorIdMap[login] = id;
});
}
return authorIdMap;
}

async function issueScraper(
username: string,
supabase: SupabaseClient,
voyageApiKey: string,
token?: string
): Promise<string> {
try {
if (!username) {
throw new Error("Username is required");
}

const context = {
adapters: {},
logger: {
info: (message: string, data: Record<string, unknown>) => console.log("INFO:", message + ":", data),
error: (message: string, data: Record<string, unknown>) => console.error("ERROR:", message + ":", data),
},
octokit: new Octokit(token ? { auth: token } : {}),
};

const octokit = new Octokit(token ? { auth: token } : {});
const voyageClient = new VoyageAIClient({ apiKey: voyageApiKey });
const issues = await fetchUserIssues(context.octokit, username);
const processedIssues: Array<{ issue: IssueMetadata; error?: string }> = [];
for (const issue of issues) {
try {
const authorId = issue.author?.login ? await fetchAuthorId(context.octokit, issue.author.login) : -1;
const repoOwner = issue.repository.owner.login;

const metadata: IssueMetadata = {
nodeId: issue.id,
number: issue.number,
title: issue.title || "",
body: issue.body || "",
state: issue.state,
stateReason: issue.stateReason,
repositoryName: issue.repository.name,
repositoryId: parseInt(issue.repository.id),
assignees: (issue.assignees?.nodes || []).map((assignee) => assignee.login),
authorId,
createdAt: issue.createdAt,
closedAt: issue.closedAt,
updatedAt: issue.updatedAt,
};
const markdown = metadata.body + " " + metadata.title;
const plaintext = markdownToPlainText(markdown);
if (!plaintext || plaintext === null) {
throw new Error("Error converting markdown to plaintext");
}
const embeddingObject = await voyageClient.embed({
input: markdown,
model: "voyage-large-2-instruct",
inputType: "document",
});
const embedding = (embeddingObject.data && embeddingObject.data[0]?.embedding) || {};
const payload = {
issue: metadata,
action: "created",
sender: {
login: username,
},
repository: {
id: parseInt(issue.repository.id),
node_id: issue.repository.id,
name: issue.repository.name,
full_name: `${repoOwner}/${issue.repository.name}`,
owner: {
login: repoOwner,
id: authorId,
type: "User",
site_admin: false,
},
},
};
//Check if the user is authenticated
if (!supabase.auth.getUser()) {
throw new Error("User is not authenticated");
}
const issues = await fetchUserIssuesBatch(octokit, username);

const { error } = await supabase.from("issues").upsert({
id: metadata.nodeId,
markdown,
plaintext,
embedding: JSON.stringify(embedding),
author_id: metadata.authorId,
modified_at: metadata.updatedAt,
payload: payload,
});
// Extract unique author logins
const uniqueAuthors = Array.from(
new Set(issues.map((issue) => issue.author?.login).filter((login): login is string => !!login))
);

processedIssues.push({
issue: metadata,
error: error ? `Error storing issue: ${error.message}` : undefined,
});
} catch (error) {
processedIssues.push({
// Fetch author IDs in batches
const authorIdMap = await batchFetchAuthorIds(octokit, uniqueAuthors);

const markdowns = issues.map((issue) => `${issue.body || ""} ${issue.title || ""}`);
const plainTexts = markdowns.map(markdownToPlainText);
const embeddings = await batchEmbeddings(voyageClient, markdowns);

const upsertData = issues.map((issue, index) => {
const authorId = issue.author?.login ? authorIdMap[issue.author.login] || -1 : -1;
const repoOwner = issue.repository.owner.login;

return {
id: issue.id,
markdown: markdowns[index],
plaintext: plainTexts[index] ?? '',
embedding: JSON.stringify(embeddings[index] || Array(VECTOR_SIZE).fill(0)),
author_id: authorId,
payload: {
issue: {
nodeId: issue.id,
number: issue.number,
Expand All @@ -303,36 +321,42 @@ async function issueScraper(username: string, supabase: SupabaseClient, voyageAp
stateReason: issue.stateReason,
repositoryName: issue.repository.name,
repositoryId: parseInt(issue.repository.id),
assignees: [],
authorId: -1,
assignees: (issue.assignees?.nodes || []).map((a) => a.login),
createdAt: issue.createdAt,
closedAt: issue.closedAt,
updatedAt: issue.updatedAt,
},
error: `Error processing issue: ${error instanceof Error ? error.message : "Unknown error"}`,
});
}
}
action: "created",
sender: { login: username },
repository: {
id: parseInt(issue.repository.id),
node_id: issue.repository.id,
name: issue.repository.name,
full_name: `${repoOwner}/${issue.repository.name}`,
owner: {
login: repoOwner,
id: authorId,
type: "User",
site_admin: false,
},
},
},
};
});

await batchUpsertIssues(supabase, upsertData);

return JSON.stringify(
{
success: true,
stats: {
storageSuccessful: processedIssues.filter((p) => !p.error).length,
storageFailed: processedIssues.filter((p) => p.error).length,
storageSuccessful: upsertData.length,
storageFailed: 0,
},
errors: processedIssues
.filter((p) => p.error)
.map((p) => ({
type: "storage",
name: `${p.issue.repositoryName}#${p.issue.number}`,
error: p.error,
})),
issues: processedIssues.map((p) => ({
number: p.issue.number,
title: p.issue.title,
repo: p.issue.repositoryName,
error: p.error,
issues: upsertData.map((issue) => ({
id: issue.id,
markdown: issue.markdown,
plaintext: issue.plaintext,
})),
},
null,
Expand Down

0 comments on commit b53a607

Please sign in to comment.