diff --git a/.changeset/quick-suns-appear.md b/.changeset/quick-suns-appear.md new file mode 100644 index 000000000..90ba6c8ec --- /dev/null +++ b/.changeset/quick-suns-appear.md @@ -0,0 +1,5 @@ +--- +'@mastra/core': patch +--- + +Fixed race condition when multiple storage methods attempt to initialize the db at the same time diff --git a/packages/core/src/storage/base.ts b/packages/core/src/storage/base.ts index 8a3a10723..527337167 100644 --- a/packages/core/src/storage/base.ts +++ b/packages/core/src/storage/base.ts @@ -18,7 +18,7 @@ export abstract class MastraStorage extends MastraBase { /** @deprecated import { TABLE_TRACES } from '@mastra/core/storage' instead */ static readonly TABLE_TRACES = TABLE_TRACES; - hasInit = false; + protected hasInitialized: null | Promise = null; constructor({ name }: { name: string }) { super({ @@ -149,113 +149,116 @@ export abstract class MastraStorage extends MastraBase { } async init(): Promise { - if (this.hasInit) { + // to prevent race conditions, await any current init + if (await this.hasInitialized) { return; } - await this.createTable({ - tableName: TABLE_WORKFLOW_SNAPSHOT, - schema: { - workflow_name: { - type: 'text', - }, - run_id: { - type: 'text', - }, - snapshot: { - type: 'text', - }, - createdAt: { - type: 'timestamp', - }, - updatedAt: { - type: 'timestamp', - }, - }, - }); - - await this.createTable({ - tableName: TABLE_EVALS, - schema: { - input: { - type: 'text', - }, - output: { - type: 'text', - }, - result: { - type: 'jsonb', + this.hasInitialized = Promise.all([ + this.createTable({ + tableName: TABLE_WORKFLOW_SNAPSHOT, + schema: { + workflow_name: { + type: 'text', + }, + run_id: { + type: 'text', + }, + snapshot: { + type: 'text', + }, + createdAt: { + type: 'timestamp', + }, + updatedAt: { + type: 'timestamp', + }, }, - agent_name: { - type: 'text', + }), + + this.createTable({ + tableName: TABLE_EVALS, + schema: { + input: { + type: 'text', + }, + output: { + type: 'text', + }, + result: { + type: 'jsonb', + }, + agent_name: { + type: 'text', + }, + metric_name: { + type: 'text', + }, + instructions: { + type: 'text', + }, + test_info: { + type: 'jsonb', + nullable: true, + }, + global_run_id: { + type: 'text', + }, + run_id: { + type: 'text', + }, + created_at: { + type: 'timestamp', + }, }, - metric_name: { - type: 'text', + }), + + this.createTable({ + tableName: TABLE_THREADS, + schema: { + id: { type: 'text', nullable: false, primaryKey: true }, + resourceId: { type: 'text', nullable: false }, + title: { type: 'text', nullable: false }, + metadata: { type: 'text', nullable: true }, + createdAt: { type: 'timestamp', nullable: false }, + updatedAt: { type: 'timestamp', nullable: false }, }, - instructions: { - type: 'text', + }), + + this.createTable({ + tableName: TABLE_MESSAGES, + schema: { + id: { type: 'text', nullable: false, primaryKey: true }, + thread_id: { type: 'text', nullable: false }, + content: { type: 'text', nullable: false }, + role: { type: 'text', nullable: false }, + type: { type: 'text', nullable: false }, + createdAt: { type: 'timestamp', nullable: false }, }, - test_info: { - type: 'jsonb', - nullable: true, + }), + + this.createTable({ + tableName: TABLE_TRACES, + schema: { + id: { type: 'text', nullable: false, primaryKey: true }, + parentSpanId: { type: 'text', nullable: true }, + name: { type: 'text', nullable: false }, + traceId: { type: 'text', nullable: false }, + scope: { type: 'text', nullable: false }, + kind: { type: 'integer', nullable: false }, + attributes: { type: 'jsonb', nullable: true }, + status: { type: 'jsonb', nullable: true }, + events: { type: 'jsonb', nullable: true }, + links: { type: 'jsonb', nullable: true }, + other: { type: 'text', nullable: true }, + startTime: { type: 'bigint', nullable: false }, + endTime: { type: 'bigint', nullable: false }, + createdAt: { type: 'timestamp', nullable: false }, }, - global_run_id: { - type: 'text', - }, - run_id: { - type: 'text', - }, - created_at: { - type: 'timestamp', - }, - }, - }); - - await this.createTable({ - tableName: TABLE_THREADS, - schema: { - id: { type: 'text', nullable: false, primaryKey: true }, - resourceId: { type: 'text', nullable: false }, - title: { type: 'text', nullable: false }, - metadata: { type: 'text', nullable: true }, - createdAt: { type: 'timestamp', nullable: false }, - updatedAt: { type: 'timestamp', nullable: false }, - }, - }); - - await this.createTable({ - tableName: TABLE_MESSAGES, - schema: { - id: { type: 'text', nullable: false, primaryKey: true }, - thread_id: { type: 'text', nullable: false }, - content: { type: 'text', nullable: false }, - role: { type: 'text', nullable: false }, - type: { type: 'text', nullable: false }, - createdAt: { type: 'timestamp', nullable: false }, - }, - }); - - await this.createTable({ - tableName: TABLE_TRACES, - schema: { - id: { type: 'text', nullable: false, primaryKey: true }, - parentSpanId: { type: 'text', nullable: true }, - name: { type: 'text', nullable: false }, - traceId: { type: 'text', nullable: false }, - scope: { type: 'text', nullable: false }, - kind: { type: 'integer', nullable: false }, - attributes: { type: 'jsonb', nullable: true }, - status: { type: 'jsonb', nullable: true }, - events: { type: 'jsonb', nullable: true }, - links: { type: 'jsonb', nullable: true }, - other: { type: 'text', nullable: true }, - startTime: { type: 'bigint', nullable: false }, - endTime: { type: 'bigint', nullable: false }, - createdAt: { type: 'timestamp', nullable: false }, - }, - }); + }), + ]).then(() => true); - this.hasInit = true; + await this.hasInitialized; } async persistWorkflowSnapshot({ @@ -290,7 +293,7 @@ export abstract class MastraStorage extends MastraBase { workflowName: string; runId: string; }): Promise { - if (!this.hasInit) { + if (!this.hasInitialized) { await this.init(); } this.logger.debug('Loading workflow snapshot', { workflowName, runId });