From 73e3af231eee39d46450c8c3cd0ebcadcb0e2be8 Mon Sep 17 00:00:00 2001 From: Flavien David Date: Thu, 23 Jan 2025 13:59:27 +0100 Subject: [PATCH] Use `WorkspaceAwareModel` on conversation related models (#10184) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Use WorkspaceAwareModel on conversation related models * Add migrations * Fix migration * 📖 * Backfill content fragments * 👕 * ➡️ --- front/lib/api/assistant/conversation.ts | 30 ++- front/lib/api/assistant/reaction.ts | 6 +- front/lib/models/assistant/conversation.ts | 13 +- .../resources/content_fragment_resource.ts | 2 + .../storage/models/content_fragment.ts | 4 +- ...orkspace_id_conversation_related_models.ts | 226 ++++++++++++++++++ front/migrations/db/migration_151.sql | 8 + front/migrations/db/migration_152.sql | 31 +++ 8 files changed, 301 insertions(+), 19 deletions(-) create mode 100644 front/migrations/20250123_backfill_workspace_id_conversation_related_models.ts create mode 100644 front/migrations/db/migration_151.sql create mode 100644 front/migrations/db/migration_152.sql diff --git a/front/lib/api/assistant/conversation.ts b/front/lib/api/assistant/conversation.ts index 6f1168bc5874..b16c691e6876 100644 --- a/front/lib/api/assistant/conversation.ts +++ b/front/lib/api/assistant/conversation.ts @@ -316,6 +316,7 @@ async function createOrUpdateParticipation({ conversationId: conversation.id, action: "posted", userId: user.id, + workspaceId: conversation.owner.id, }); } } @@ -774,7 +775,7 @@ export async function* postUserMessage( transaction: t, })) ?? -1) + 1; - async function createMessageAndUserMessage() { + async function createMessageAndUserMessage(workspace: WorkspaceType) { return Message.create( { sId: generateRandomModelSId(), @@ -795,14 +796,16 @@ export async function* postUserMessage( ? user.id : ( await attributeUserFromWorkspaceAndEmail( - owner, + workspace, context.email ) )?.id, + workspaceId: workspace.id, }, { transaction: t } ) ).id, + workspaceId: workspace.id, }, { transaction: t, @@ -810,7 +813,7 @@ export async function* postUserMessage( ); } - const m = await createMessageAndUserMessage(); + const m = await createMessageAndUserMessage(owner); const userMessage: UserMessageWithRankType = { id: m.id, created: m.createdAt.getTime(), @@ -843,6 +846,7 @@ export async function* postUserMessage( { messageId: m.id, agentConfigurationId: configuration.sId, + workspaceId: owner.id, }, { transaction: t } ); @@ -852,6 +856,7 @@ export async function* postUserMessage( status: "created", agentConfigurationId: configuration.sId, agentConfigurationVersion: configuration.version, + workspaceId: owner.id, }, { transaction: t } ); @@ -862,6 +867,7 @@ export async function* postUserMessage( conversationId: conversation.id, parentId: userMessage.id, agentMessageId: agentMessageRow.id, + workspaceId: owner.id, }, { transaction: t, @@ -1236,7 +1242,10 @@ export async function* editUserMessage( } const userMessageRow = messageRow.userMessage; // adding messageRow as param otherwise Ts doesn't get it can't be null - async function createMessageAndUserMessage(messageRow: Message) { + async function createMessageAndUserMessage( + workspace: WorkspaceType, + messageRow: Message + ) { return Message.create( { sId: generateRandomModelSId(), @@ -1259,14 +1268,16 @@ export async function* editUserMessage( ? userMessageRow.userId : ( await attributeUserFromWorkspaceAndEmail( - owner, + workspace, userMessageRow.userContextEmail ) )?.id, + workspaceId: workspace.id, }, { transaction: t } ) ).id, + workspaceId: workspace.id, }, { transaction: t, @@ -1274,7 +1285,7 @@ export async function* editUserMessage( ); } - const m = await createMessageAndUserMessage(messageRow); + const m = await createMessageAndUserMessage(owner, messageRow); const userMessage: UserMessageWithRankType = { id: m.id, @@ -1320,6 +1331,7 @@ export async function* editUserMessage( { messageId: m.id, agentConfigurationId: configuration.sId, + workspaceId: owner.id, }, { transaction: t } ); @@ -1329,6 +1341,7 @@ export async function* editUserMessage( status: "created", agentConfigurationId: configuration.sId, agentConfigurationVersion: configuration.version, + workspaceId: owner.id, }, { transaction: t } ); @@ -1339,6 +1352,7 @@ export async function* editUserMessage( conversationId: conversation.id, parentId: userMessage.id, agentMessageId: agentMessageRow.id, + workspaceId: owner.id, }, { transaction: t, @@ -1568,6 +1582,7 @@ export async function* retryAgentMessage( agentConfigurationId: messageRow.agentMessage.agentConfigurationId, agentConfigurationVersion: messageRow.agentMessage.agentConfigurationVersion, + workspaceId: auth.getNonNullableWorkspace().id, }, { transaction: t } ); @@ -1579,6 +1594,7 @@ export async function* retryAgentMessage( parentId: messageRow.parentId, version: messageRow.version + 1, agentMessageId: agentMessageRow.id, + workspaceId: auth.getNonNullableWorkspace().id, }, { transaction: t, @@ -1759,6 +1775,7 @@ export async function postNewContentFragment( userContextEmail: context?.email, userContextFullName: context?.fullName, userContextUsername: context?.username, + workspaceId: owner.id, }; const contentFragment = await (() => { @@ -1786,6 +1803,7 @@ export async function postNewContentFragment( rank: nextMessageRank, conversationId: conversation.id, contentFragmentId: contentFragment.id, + workspaceId: owner.id, }, { transaction: t, diff --git a/front/lib/api/assistant/reaction.ts b/front/lib/api/assistant/reaction.ts index 7300aef821bf..ca90789cb0d2 100644 --- a/front/lib/api/assistant/reaction.ts +++ b/front/lib/api/assistant/reaction.ts @@ -106,10 +106,7 @@ export async function createMessageReaction( reaction: string; } ): Promise { - const owner = auth.workspace(); - if (!owner) { - throw new Error("Unexpected `auth` without `workspace`."); - } + const owner = auth.getNonNullableWorkspace(); const message = await Message.findOne({ where: { @@ -128,6 +125,7 @@ export async function createMessageReaction( userContextUsername: context.username, userContextFullName: context.fullName, reaction, + workspaceId: owner.id, }); return newReaction !== null; } diff --git a/front/lib/models/assistant/conversation.ts b/front/lib/models/assistant/conversation.ts index b62ec30d1b54..c90f1cf94511 100644 --- a/front/lib/models/assistant/conversation.ts +++ b/front/lib/models/assistant/conversation.ts @@ -13,7 +13,6 @@ import type { AgentMessageContent } from "@app/lib/models/assistant/agent_messag import { frontSequelize } from "@app/lib/resources/storage"; import { ContentFragmentModel } from "@app/lib/resources/storage/models/content_fragment"; import { UserModel } from "@app/lib/resources/storage/models/user"; -import { BaseModel } from "@app/lib/resources/storage/wrappers/base"; import { WorkspaceAwareModel } from "@app/lib/resources/storage/wrappers/workspace_models"; export class Conversation extends WorkspaceAwareModel { @@ -84,7 +83,7 @@ Conversation.init( } ); -export class ConversationParticipant extends BaseModel { +export class ConversationParticipant extends WorkspaceAwareModel { declare createdAt: CreationOptional; declare updatedAt: CreationOptional; @@ -150,7 +149,7 @@ ConversationParticipant.belongsTo(UserModel, { foreignKey: { name: "userId", allowNull: false }, }); -export class UserMessage extends BaseModel { +export class UserMessage extends WorkspaceAwareModel { declare createdAt: CreationOptional; declare updatedAt: CreationOptional; @@ -220,7 +219,7 @@ UserMessage.belongsTo(UserModel, { foreignKey: { name: "userId", allowNull: true }, }); -export class AgentMessage extends BaseModel { +export class AgentMessage extends WorkspaceAwareModel { declare createdAt: CreationOptional; declare updatedAt: CreationOptional; declare runIds: string[] | null; @@ -371,7 +370,7 @@ AgentMessageFeedback.belongsTo(AgentMessage, { as: "agentMessage", }); -export class Message extends BaseModel { +export class Message extends WorkspaceAwareModel { declare createdAt: CreationOptional; declare updatedAt: CreationOptional; @@ -512,7 +511,7 @@ Message.belongsTo(ContentFragmentModel, { foreignKey: { name: "contentFragmentId", allowNull: true }, }); -export class MessageReaction extends BaseModel { +export class MessageReaction extends WorkspaceAwareModel { declare createdAt: CreationOptional; declare updatedAt: CreationOptional; @@ -583,7 +582,7 @@ MessageReaction.belongsTo(UserModel, { foreignKey: { name: "userId", allowNull: true }, // null = mention is not a user using a Slackbot }); -export class Mention extends BaseModel { +export class Mention extends WorkspaceAwareModel { declare createdAt: CreationOptional; declare updatedAt: CreationOptional; diff --git a/front/lib/resources/content_fragment_resource.ts b/front/lib/resources/content_fragment_resource.ts index 88f69c1960ee..6ddd98fa22c6 100644 --- a/front/lib/resources/content_fragment_resource.ts +++ b/front/lib/resources/content_fragment_resource.ts @@ -59,6 +59,7 @@ export class ContentFragmentResource extends BaseResource ...blob, sId: generateRandomModelSId("cf"), version: "latest", + workspaceId: blob.workspaceId, }, { transaction, @@ -91,6 +92,7 @@ export class ContentFragmentResource extends BaseResource ...blob, sId, version: "latest", + workspaceId: blob.workspaceId, }, { transaction: t, diff --git a/front/lib/resources/storage/models/content_fragment.ts b/front/lib/resources/storage/models/content_fragment.ts index 2c864466da7a..dcc6265d1758 100644 --- a/front/lib/resources/storage/models/content_fragment.ts +++ b/front/lib/resources/storage/models/content_fragment.ts @@ -8,9 +8,9 @@ import { DataTypes } from "sequelize"; import { frontSequelize } from "@app/lib/resources/storage"; import { FileModel } from "@app/lib/resources/storage/models/files"; import { UserModel } from "@app/lib/resources/storage/models/user"; -import { BaseModel } from "@app/lib/resources/storage/wrappers/base"; +import { WorkspaceAwareModel } from "@app/lib/resources/storage/wrappers/workspace_models"; -export class ContentFragmentModel extends BaseModel { +export class ContentFragmentModel extends WorkspaceAwareModel { declare createdAt: CreationOptional; declare updatedAt: CreationOptional; diff --git a/front/migrations/20250123_backfill_workspace_id_conversation_related_models.ts b/front/migrations/20250123_backfill_workspace_id_conversation_related_models.ts new file mode 100644 index 000000000000..05eaf1d03319 --- /dev/null +++ b/front/migrations/20250123_backfill_workspace_id_conversation_related_models.ts @@ -0,0 +1,226 @@ +import type { LightWorkspaceType } from "@dust-tt/types"; +import _ from "lodash"; +import { Op } from "sequelize"; + +import { + AgentMessage, + Conversation, + ConversationParticipant, + Mention, + Message, + MessageReaction, + UserMessage, +} from "@app/lib/models/assistant/conversation"; +import { ContentFragmentModel } from "@app/lib/resources/storage/models/content_fragment"; +import type { WorkspaceAwareModel } from "@app/lib/resources/storage/wrappers/workspace_models"; +import type { Logger } from "@app/logger/logger"; +import { makeScript } from "@app/scripts/helpers"; +import { runOnAllWorkspaces } from "@app/scripts/workspace_helpers"; + +type TableConfig = { + model: typeof WorkspaceAwareModel; + include: (workspaceId: number) => any[]; +}; + +const TABLES: TableConfig[] = [ + { + model: ConversationParticipant, + include: (workspaceId: number) => [ + { + model: Conversation, + required: true, + where: { workspaceId }, + }, + ], + }, + { + model: Message, + include: (workspaceId: number) => [ + { + as: "conversation", + model: Conversation, + required: true, + where: { workspaceId }, + }, + ], + }, + { + model: UserMessage, + include: (workspaceId: number) => [ + { + as: "message", + model: Message, + required: true, + include: [ + { + as: "conversation", + model: Conversation, + required: true, + where: { workspaceId }, + }, + ], + }, + ], + }, + { + model: AgentMessage, + include: (workspaceId: number) => [ + { + as: "message", + model: Message, + required: true, + include: [ + { + as: "conversation", + model: Conversation, + required: true, + where: { workspaceId }, + }, + ], + }, + ], + }, + { + model: ContentFragmentModel, + include: (workspaceId: number) => [ + { + as: "message", + model: Message, + required: true, + include: [ + { + as: "conversation", + model: Conversation, + required: true, + where: { workspaceId }, + }, + ], + }, + ], + }, + { + model: MessageReaction, + include: (workspaceId: number) => [ + { + model: Message, + required: true, + include: [ + { + as: "conversation", + model: Conversation, + required: true, + where: { workspaceId }, + }, + ], + }, + ], + }, + { + model: Mention, + include: (workspaceId: number) => [ + { + model: Message, + required: true, + include: [ + { + as: "conversation", + model: Conversation, + required: true, + where: { workspaceId }, + }, + ], + }, + ], + }, +]; + +async function backfillTable( + workspace: LightWorkspaceType, + table: TableConfig, + { execute, logger }: { execute: boolean; logger: Logger } +) { + let lastSeenId = 0; + const batchSize = 1000; + let totalProcessed = 0; + + logger.info( + { workspaceId: workspace.sId, table: table.model.tableName }, + "Starting table backfill" + ); + + for (;;) { + const records = await table.model.findAll({ + where: { + id: { [Op.gt]: lastSeenId }, + workspaceId: { [Op.is]: null }, + }, + order: [["id", "ASC"]], + limit: batchSize, + include: table.include(workspace.id), + }); + + if (records.length === 0) { + break; + } + + totalProcessed += records.length; + logger.info( + { + workspaceId: workspace.sId, + table: table.model.tableName, + batchSize: records.length, + totalProcessed, + lastId: lastSeenId, + }, + "Processing batch" + ); + + if (execute) { + const recordIds = records.map((r) => r.id); + await table.model.update( + { workspaceId: workspace.id }, + { + where: { + id: { [Op.in]: recordIds }, + }, + // Required to avoid hitting validation hook, which does not play nice with bulk updates. + hooks: false, + fields: ["workspaceId"], + } + ); + } + + lastSeenId = records[records.length - 1].id; + } + + return totalProcessed; +} + +async function backfillTablesForWorkspace( + workspace: LightWorkspaceType, + { execute, logger }: { execute: boolean; logger: Logger } +) { + logger.info( + { workspaceId: workspace.sId, execute }, + "Starting workspace backfill" + ); + + const stats: any = {}; + for (const table of TABLES) { + stats[table.model.tableName] = await backfillTable(workspace, table, { + execute, + logger, + }); + } + + logger.info( + { workspaceId: workspace.sId, stats }, + "Completed workspace backfill" + ); +} + +makeScript({}, async ({ execute }, logger) => { + return runOnAllWorkspaces(async (workspace) => { + await backfillTablesForWorkspace(workspace, { execute, logger }); + }); +}); diff --git a/front/migrations/db/migration_151.sql b/front/migrations/db/migration_151.sql new file mode 100644 index 000000000000..5e7e973bef89 --- /dev/null +++ b/front/migrations/db/migration_151.sql @@ -0,0 +1,8 @@ +-- Migration created on Jan 23, 2025 +ALTER TABLE "public"."conversation_participants" ADD COLUMN "workspaceId" BIGINT REFERENCES "workspaces" ("id") ON DELETE RESTRICT ON UPDATE CASCADE; +ALTER TABLE "public"."user_messages" ADD COLUMN "workspaceId" BIGINT REFERENCES "workspaces" ("id") ON DELETE RESTRICT ON UPDATE CASCADE; +ALTER TABLE "public"."agent_messages" ADD COLUMN "workspaceId" BIGINT REFERENCES "workspaces" ("id") ON DELETE RESTRICT ON UPDATE CASCADE; +ALTER TABLE "public"."messages" ADD COLUMN "workspaceId" BIGINT REFERENCES "workspaces" ("id") ON DELETE RESTRICT ON UPDATE CASCADE; +ALTER TABLE "public"."message_reactions" ADD COLUMN "workspaceId" BIGINT REFERENCES "workspaces" ("id") ON DELETE RESTRICT ON UPDATE CASCADE; +ALTER TABLE "public"."mentions" ADD COLUMN "workspaceId" BIGINT REFERENCES "workspaces" ("id") ON DELETE RESTRICT ON UPDATE CASCADE; +ALTER TABLE "public"."content_fragments" ADD COLUMN "workspaceId" BIGINT REFERENCES "workspaces" ("id") ON DELETE RESTRICT ON UPDATE CASCADE; \ No newline at end of file diff --git a/front/migrations/db/migration_152.sql b/front/migrations/db/migration_152.sql new file mode 100644 index 000000000000..f389c4e3b11d --- /dev/null +++ b/front/migrations/db/migration_152.sql @@ -0,0 +1,31 @@ +-- The backfill script is: 20240912_backfill_editedbyuser_id +-- run psql with --set=backfilled=1 argument if you have rune the script. + +CREATE OR REPLACE FUNCTION perform_migration(backfilled boolean DEFAULT false) +RETURNS VARCHAR AS $$ +BEGIN + IF NOT backfilled THEN + RAISE NOTICE 'The backfill script: 20250123_backfill_workspace_id_conversation_related_models is required before applying this migation. If you already did it, run psql with --set=backfilled=1 argument.'; + END IF; + +-- Migration created on Jan 23, 2025 +ALTER TABLE "public"."conversation_participants" ALTER COLUMN "workspaceId" SET NOT NULL; +ALTER TABLE "public"."user_messages" ALTER COLUMN "workspaceId" SET NOT NULL; +ALTER TABLE "public"."agent_messages" ALTER COLUMN "workspaceId" SET NOT NULL; +ALTER TABLE "public"."messages" ALTER COLUMN "workspaceId" SET NOT NULL; +ALTER TABLE "public"."message_reactions" ALTER COLUMN "workspaceId" SET NOT NULL; +ALTER TABLE "public"."mentions" ALTER COLUMN "workspaceId" SET NOT NULL; +ALTER TABLE "public"."content_fragments" ALTER COLUMN "workspaceId" SET NOT NULL; + + RETURN 'success'; +END; +$$ LANGUAGE plpgsql; + +\if :{?backfilled} + SELECT perform_migration(:'backfilled'::boolean); +\else + \echo '!! Migration was NOT applied !!' + \echo 'The backfill script: 20250123_backfill_workspace_id_conversation_related_models is required before applying this migation. If you already did it, run psql with --set=backfilled=1 argument.' +\endif + +DROP FUNCTION perform_migration(boolean); \ No newline at end of file