Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refresh AI cache on proposal update #254

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ router.post('/ai/summary/:id', async (req, res) => {
const aiSummary = new AiSummary(id, storageEngine(process.env.AI_SUMMARY_SUBDIR));

try {
const cachedSummary = await aiSummary.getCache();
const cachedSummary = (await aiSummary.isCacheable()) && (await aiSummary.getCache());

let summary = '';

Expand All @@ -66,7 +66,7 @@ router.post('/ai/tts/:id', async (req, res) => {
const aiTextTpSpeech = new AiTextToSpeech(id, storageEngine(process.env.AI_TTS_SUBDIR));

try {
const cachedAudio = await aiTextTpSpeech.getCache();
const cachedAudio = (await aiTextTpSpeech.isCacheable()) && (await aiTextTpSpeech.getCache());

let audio: Buffer;

Expand Down
4 changes: 3 additions & 1 deletion src/helpers/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import { fetchWithKeepAlive } from './utils';

export type Proposal = {
id: string;
state: string;
state: 'pending' | 'active' | 'closed';
choices: string[];
space: Space;
votes: number;
author: string;
title: string;
body: string;
discussion: string;
updated: number;
};

export type Vote = {
Expand Down Expand Up @@ -78,6 +79,7 @@ const PROPOSAL_QUERY = gql`
network
name
}
updated
}
}
`;
Expand Down
19 changes: 16 additions & 3 deletions src/lib/ai/summary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { fetchProposal, Proposal } from '../../helpers/snapshot';
import { IStorage } from '../storage/types';
import Cache from '../cache';

const tempCacheIds = new Map<string, number>();

class Summary extends Cache {
proposal?: Proposal | null;
openAi: OpenAI;
Expand All @@ -16,9 +18,8 @@ class Summary extends Cache {
async isCacheable() {
this.proposal = await fetchProposal(this.id);

if (!this.proposal) {
throw new Error('RECORD_NOT_FOUND');
}
if (!this.proposal) throw new Error('RECORD_NOT_FOUND');
if (this.#cacheExpired()) return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If cache is expired, isCacheable should return true right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, false, since the item is not cache-able anymore


return true;
}
Expand Down Expand Up @@ -53,6 +54,18 @@ class Summary extends Cache {
throw e.error?.code ? new Error(e.error?.code.toUpperCase()) : e;
}
};

#cacheExpired = () => {
const { id, state, updated } = this.proposal!;

if (state !== 'pending') return false;

return tempCacheIds.has(id) && tempCacheIds.get(id) !== updated;
};

afterCreateCache() {
tempCacheIds.set(this.proposal!.id, this.proposal!.updated);
}
Comment on lines +63 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can't relay on updated param for cache, a pending proposal can be updated any number of times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update value will be cached only after cache creation. So each time it changes, it will invalidate the cache

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh updated is timestamp. got it

}

export default Summary;
18 changes: 15 additions & 3 deletions src/lib/ai/textToSpeech.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { IStorage } from '../storage/types';

const MIN_BODY_LENGTH = 1;
const MAX_BODY_LENGTH = 4096;
const tempCacheIds = new Map<string, number>();

export default class TextToSpeech extends Cache {
proposal?: Proposal | null;
Expand All @@ -20,9 +21,8 @@ export default class TextToSpeech extends Cache {
async isCacheable() {
this.proposal = await fetchProposal(this.id);

if (!this.proposal) {
throw new Error('RECORD_NOT_FOUND');
}
if (!this.proposal) throw new Error('RECORD_NOT_FOUND');
if (this.#cacheExpired()) return false;

return true;
}
Expand All @@ -47,4 +47,16 @@ export default class TextToSpeech extends Cache {
throw e.error?.code ? new Error(e.error?.code.toUpperCase()) : e;
}
};

#cacheExpired = () => {
const { id, state, updated } = this.proposal!;

if (state !== 'pending') return false;

return tempCacheIds.has(id) && tempCacheIds.get(id) !== updated;
};

afterCreateCache() {
tempCacheIds.set(this.proposal!.id, this.proposal!.updated);
}
}
6 changes: 5 additions & 1 deletion src/lib/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ export default class Cache {
await this.isCacheable();
const content = await this.getContent();

console.log(`[votes-report] File cache ready to be saved`);
console.log(`[${this.constructor.name}] File cache ready to be saved`);

this.storage.set(this.filename, content);
this.afterCreateCache();

return content;
}

toString() {
return `${this.constructor.name}#${this.id}`;
}
}
26 changes: 17 additions & 9 deletions src/lib/queue.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
import { sleep } from '../helpers/utils';
import { capture } from '@snapshot-labs/snapshot-sentry';
import Cache from './cache';
import { timeQueueProcess } from './metrics';
import type Cache from './cache';

const queues = new Map<string, Cache>();
const processingItems = new Map<string, Cache>();

async function processItem(cacheable: Cache) {
console.log(`[queue] Processing queue item: ${cacheable}`);
try {
if (
['Summary', 'TextToSpeech'].includes(cacheable.constructor.name) &&
!(await cacheable.getCache())
) {
return;
}

const end = timeQueueProcess.startTimer({ name: cacheable.constructor.name });
processingItems.set(cacheable.id, cacheable);
processingItems.set(cacheable.toString(), cacheable);

await cacheable.createCache();
end();
} catch (e) {
capture(e, { id: cacheable.id });
capture(e, { id: cacheable.toString() });
console.error(`[queue] Error while processing item`, e);
} finally {
queues.delete(cacheable.id);
processingItems.delete(cacheable.id);
queues.delete(cacheable.toString());
processingItems.delete(cacheable.toString());
}
}

export function queue(cacheable: Cache) {
if (!queues.has(cacheable.id)) {
queues.set(cacheable.id, cacheable);
if (!queues.has(cacheable.toString())) {
queues.set(cacheable.toString(), cacheable);
}

return queues.size;
Expand All @@ -35,8 +43,8 @@ export function size() {
}

export function getProgress(id: string) {
if (processingItems.has(id)) {
return processingItems.get(id)?.generationProgress as number;
if (processingItems.has(id.toString())) {
return processingItems.get(id.toString())?.generationProgress as number;
}

return 0;
Expand Down
4 changes: 0 additions & 4 deletions src/lib/votesReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,6 @@ class VotesReport extends Cache {
return votes;
};

toString() {
return `VotesReport#${this.id}`;
}

#formatCsvLine = (vote: Vote) => {
let choices: Vote['choice'][] = [];

Expand Down
13 changes: 10 additions & 3 deletions src/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,28 @@ import express from 'express';
import { rpcError, rpcSuccess, storageEngine } from './helpers/utils';
import { capture } from '@snapshot-labs/snapshot-sentry';
import VotesReport from './lib/votesReport';
import Summary from './lib/ai/summary';
import TextToSpeech from './lib/ai/textToSpeech';
import { queue } from './lib/queue';

const router = express.Router();

function processVotesReport(id: string, event: string) {
function processEvent(id: string, event: string) {
if (event == 'proposal/end') {
queue(new VotesReport(id, storageEngine(process.env.VOTE_REPORT_SUBDIR)));
}

if (event === 'proposal/start') {
queue(new Summary(id, storageEngine(process.env.AI_SUMMARY_SUBDIR)));
queue(new TextToSpeech(id, storageEngine(process.env.AI_TTS_SUBDIR)));
}
}

router.post('/webhook', (req, res) => {
const body = req.body || {};
const event = body.event?.toString() ?? '';
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { type, id } = body.id?.toString().split('/');
const [type, id] = body.id?.toString().split('/');

if (req.headers['authentication'] !== `${process.env.WEBHOOK_AUTH_TOKEN ?? ''}`) {
return rpcError(res, 'UNAUTHORIZED', id);
Expand All @@ -27,7 +34,7 @@ router.post('/webhook', (req, res) => {
}

try {
processVotesReport(id, event);
processEvent(id, event);
return rpcSuccess(res, 'Webhook received', id);
} catch (e) {
capture(e, { body });
Expand Down
78 changes: 78 additions & 0 deletions test/unit/lib/ai/summary.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import Summary from '../../../../src/lib/ai/summary';
import * as snapshotHelper from '../../../../src/helpers/snapshot';
import { storageEngine } from '../../../../src/helpers/utils';

const fetchProposalMock = jest.spyOn(snapshotHelper, 'fetchProposal');

describe('AI summary', () => {
describe('isCacheable()', () => {
describe('when the proposal is pending', () => {
it('returns true if the proposal has not been cached yet', () => {
expect.assertions(2);
const summary = new Summary('1', storageEngine());
fetchProposalMock.mockResolvedValueOnce({
id: '2',
state: 'pending',
updated: 1
} as snapshotHelper.Proposal);

expect(summary.isCacheable()).resolves.toBe(true);
expect(fetchProposalMock).toHaveBeenCalledTimes(1);
});

it('returns true if the proposal has not been updated since last cache', async () => {
expect.assertions(2);
const summary = new Summary('summary-1', storageEngine());
fetchProposalMock.mockResolvedValueOnce({
id: '1',
state: 'pending',
updated: 1
} as snapshotHelper.Proposal);
await summary.isCacheable();
summary.afterCreateCache();

fetchProposalMock.mockResolvedValueOnce({
id: '1',
state: 'pending',
updated: 2
} as snapshotHelper.Proposal);

expect(summary.isCacheable()).resolves.toBe(false);
expect(fetchProposalMock).toHaveBeenCalledTimes(2);
});

it('returns false if the proposal has been updated since last cache', async () => {
expect.assertions(2);
const summary = new Summary('1', storageEngine());
fetchProposalMock.mockResolvedValue({
id: '3',
state: 'pending',
updated: 1
} as snapshotHelper.Proposal);
await summary.isCacheable();
summary.afterCreateCache();

expect(summary.isCacheable()).resolves.toBe(true);
expect(fetchProposalMock).toHaveBeenCalledTimes(2);
});
});

it('returns true when the proposal exist', async () => {
expect.assertions(2);
const summary = new Summary('1', storageEngine());
fetchProposalMock.mockResolvedValueOnce({} as snapshotHelper.Proposal);

expect(summary.isCacheable()).resolves.toBe(true);
expect(fetchProposalMock).toHaveBeenCalledTimes(1);
});

it('returns false when the proposal does not exist', () => {
expect.assertions(2);
const summary = new Summary('1', storageEngine());
fetchProposalMock.mockRejectedValueOnce(new Error('RECORD_NOT_FOUND'));

expect(summary.isCacheable()).rejects.toThrow('RECORD_NOT_FOUND');
expect(fetchProposalMock).toHaveBeenCalledTimes(1);
});
});
});
Loading