Skip to content

Commit

Permalink
fix: correct index and delete actions of vector store file job
Browse files Browse the repository at this point in the history
  • Loading branch information
jack0pan committed Jul 15, 2024
1 parent 8534fcf commit 0253b6f
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 25 deletions.
39 changes: 28 additions & 11 deletions jobs/vector_store_file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export class VectorStoreFileJob {
vsf,
{
status: "completed",
usage_bytes: await VectorDb.size(vectorStoreId, fileId),
},
vectorStoreId,
operation,
Expand All @@ -112,8 +113,8 @@ export class VectorStoreFileJob {
{
file_counts: {
...vs.file_counts,
in_progress: vs.file_counts.in_progress - 1,
completed: vs.file_counts.completed + 1,
in_progress: this.accumulate(-1, vs.file_counts.in_progress),
completed: this.accumulate(1, vs.file_counts.completed),
},
usage_bytes: await VectorDb.size(vectorStoreId),
},
Expand All @@ -135,51 +136,67 @@ export class VectorStoreFileJob {
organization: string,
vectorStoreId: string,
fileId: string,
status: VectorStoreFileObject["status"],
) {
const logName = `vector store(${vectorStoreId}) and file(${fileId})`;
log.info(`${LOG_TAG} start {delete} action for ${logName}`);
const vsfRepo = VectorStoreFileRepository.getInstance();
const vsRepo = VectorStoreRepository.getInstance();
let vsf, vs;
let vs;
try {
vsf = await vsfRepo.findById(fileId, vectorStoreId);
vs = await vsRepo.findById(vectorStoreId, organization);
} catch (e) {
log.error(`${LOG_TAG} find ${logName} with ${e}`);
}
if (!vsf || !vs) return;
if (!vs) return;

await VectorDb.delete(vectorStoreId, fileId);

vsRepo.update(
await vsRepo.update(
vs,
{
usage_bytes: await VectorDb.size(vectorStoreId),
file_counts: {
...vs.file_counts,
[vsf.status]: vs.file_counts[vsf.status] - 1,
total: vs.file_counts.total - 1,
[status]: this.accumulate(-1, vs.file_counts[status]),
total: this.accumulate(-1, vs.file_counts.total),
},
},
organization,
);

log.info(`${LOG_TAG} start {delete} action for ${logName}`);
log.info(`${LOG_TAG} complete {delete} action for ${logName}`);
}

private static accumulate(b: number, a?: number | null) {
if (a) {
return a + b > 0 ? a + b : 0;
} else {
if (b > 0) {
return b;
}
return 0;
}
}

public static async execute(args: {
organization: string;
vectorStoreId: string;
fileId: string;
action: "index" | "delete";
status?: string;
}) {
const { action, vectorStoreId, organization, fileId } = args;
switch (action) {
case "index":
await this.index(organization, vectorStoreId, fileId);
break;
case "delete":
await this.delete(organization, vectorStoreId, fileId);
await this.delete(
organization,
vectorStoreId,
fileId,
args["status"] as VectorStoreFileObject["status"],
);
break;
}
}
Expand Down
18 changes: 15 additions & 3 deletions providers/vector_db/pgvector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,22 @@ export default class Client {
await this.query(sql);
}

public static async size(vectorStoreId: string): Promise<number> {
const sql = `SELECT pg_table_size('${vectorStoreId}') as size`;
public static async size(
vectorStoreId: string,
fileId?: string,
): Promise<number> {
let query = `SELECT pg_table_size('${vectorStoreId}') as size`;
if (fileId) {
query = `WITH record_sizes AS (
SELECT pg_column_size(t.*) AS column_size FROM ${vectorStoreId} t WHERE file_id = '${fileId}'
)
SELECT
sum(column_size) AS size
FROM
record_sizes;`;
}

const { rows: [{ size }] } = await this.query(sql);
const { rows: [{ size }] } = await this.query(query);
return Number(size);
}

Expand Down
22 changes: 18 additions & 4 deletions repositories/vector_store_file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from "$/schemas/openai/mod.ts";
import { Jina } from "$/providers/reader/jina.ts";
import { ulid } from "$std/ulid/mod.ts";
import { VectorStoreRepository } from "$/repositories/vector_store.ts";

const DEFAULT_CHUNKING_STRATEGY = {
type: "static",
Expand Down Expand Up @@ -90,27 +91,39 @@ export class VectorStoreFileRepository

async createByFileIdWithJob(
fileId: string,
vectorStoreId: string,
vectorStore: VectorStoreObject,
organization: string,
chunkingStrategy?: AutoChunkingStrategy | StaticChunkingStrategy | null,
) {
const operation = kv.atomic();

const value = await this.createByFileId(
fileId,
vectorStoreId,
vectorStore.id,
chunkingStrategy,
operation,
);
operation.enqueue({
type: "vector_store_file",
args: JSON.stringify({
action: "index",
vectorStoreId,
fileId,
vectorStoreId: vectorStore.id,
fileId: value.id,
organization,
}),
});
await VectorStoreRepository.getInstance().update(
vectorStore,
{
file_counts: {
...vectorStore.file_counts,
in_progress: vectorStore.file_counts.in_progress + 1,
total: vectorStore.file_counts.total + 1,
},
},
organization,
operation,
);

const { ok } = await operation.commit();
if (!ok) throw new Conflict();
Expand All @@ -132,6 +145,7 @@ export class VectorStoreFileRepository
vectorStoreId: vs.id,
fileId: vsf.id,
organization,
status: vsf.status,
}),
});

Expand Down
6 changes: 4 additions & 2 deletions routes/v1/vector_stores/[vector_store_id]/files/[file_id].ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { FreshContext, Handlers } from "$fresh/server.ts";
import type { VectorStoreFileObject } from "$open-schemas/types/openai/mod.ts";
import { DeleteVectorStoreFileResponse } from "$open-schemas/zod/openai/mod.ts";
import {
DeleteVectorStoreFileResponse,
VectorStoreFileObject,
} from "$/schemas/openai/mod.ts";
import { VectorStoreFileRepository } from "$/repositories/vector_store_file.ts";
import { getVectorStore } from "$/routes/v1/vector_stores/[vector_store_id].ts";

Expand Down
12 changes: 7 additions & 5 deletions routes/v1/vector_stores/[vector_store_id]/files/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { FreshContext, Handlers } from "$fresh/server.ts";
import type { VectorStoreFileObject } from "$open-schemas/types/openai/mod.ts";
import {
CreateVectorStoreFileRequest,
Ordering,
Pagination,
VectorStoreFileObject,
} from "$/schemas/openai/mod.ts";
import { VectorStoreFileRepository } from "$/repositories/vector_store_file.ts";
import { getVectorStore } from "$/routes/v1/vector_stores/[vector_store_id].ts";

export const handler: Handlers<VectorStoreFileObject | null> = {
async GET(_req: Request, ctx: FreshContext) {
const params = Object.fromEntries(ctx.url.searchParams);
const organization = ctx.state.organization as string;
// const organization = ctx.state.organization as string;
const vectorStoreId = ctx.params.vector_store_id as string;

const page = await VectorStoreFileRepository.getInstance().findAllByPage(
organization,
vectorStoreId,
Pagination.parse(params),
Ordering.parse(params),
);
Expand All @@ -23,13 +25,13 @@ export const handler: Handlers<VectorStoreFileObject | null> = {

async POST(req: Request, ctx: FreshContext) {
const fields = CreateVectorStoreFileRequest.parse(await req.json());
const vectorStoreId = ctx.params.vector_store_id as string;
const vectorStore = await getVectorStore(ctx);
const organization = ctx.state.organization as string;

const vectorStoreFile = await VectorStoreFileRepository.getInstance()
.createByFileIdWithJob(
fields.file_id,
vectorStoreId,
vectorStore,
organization,
fields.chunking_strategy,
);
Expand Down

0 comments on commit 0253b6f

Please sign in to comment.