diff --git a/apps/backend/src/db.ts b/apps/backend/src/db.ts deleted file mode 100644 index 40b4ae5e6..000000000 --- a/apps/backend/src/db.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { IQueryBuilder, startTransaction } from "@undb/persistence/server" -import { IsolationLevel } from "kysely" - -export const withTransaction = - (qb: IQueryBuilder, level: IsolationLevel = "read committed") => - (callback: () => Promise): Promise => { - return new Promise((resolve, reject) => { - return qb - .transaction() - .setIsolationLevel(level) - .execute(async (trx) => { - startTransaction(trx) - try { - const result = await callback() - resolve(result) - } catch (error) { - reject(error) - } - }) - }) - } diff --git a/apps/backend/src/modules/auth/auth.ts b/apps/backend/src/modules/auth/auth.ts index c770e1fbe..6d500dcdf 100644 --- a/apps/backend/src/modules/auth/auth.ts +++ b/apps/backend/src/modules/auth/auth.ts @@ -14,7 +14,7 @@ import { None, Option, Some } from "@undb/domain" import { env } from "@undb/env" import { createLogger } from "@undb/logger" import { type IMailService, injectMailService } from "@undb/mail" -import { type IQueryBuilder, getCurrentTransaction, injectQueryBuilder } from "@undb/persistence/server" +import { type IQueryBuilder, type ITxContext, injectQueryBuilder, injectTxCTX } from "@undb/persistence/server" import { type ISpaceService, injectSpaceService } from "@undb/space" import { Context, Elysia, t } from "elysia" import type { Session, User } from "lucia" @@ -25,7 +25,6 @@ import { alphabet, generateRandomString, sha256 } from "oslo/crypto" import { encodeHex } from "oslo/encoding" import { omit } from "radash" import { v7 } from "uuid" -import { withTransaction } from "../../db" import { injectLucia } from "./auth.provider" import { OAuth } from "./oauth/oauth" @@ -52,10 +51,12 @@ export class Auth { private readonly mailService: IMailService, @injectLucia() private readonly lucia: Lucia, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async #generateEmailVerificationCode(userId: string, email: string): Promise { - const tx = getCurrentTransaction() + const tx = this.txContext.getCurrentTransaction() await tx.deleteFrom("undb_email_verification_code").where("user_id", "=", userId).execute() const code = env.UNDB_MOCK_MAIL_CODE || generateRandomString(6, alphabet("0-9")) await tx @@ -71,29 +72,32 @@ export class Auth { } async #verifyVerificationCode(user: User, code: string): Promise { - return (getCurrentTransaction() ?? this.queryBuilder).transaction().execute(async (tx) => { - const databaseCode = await tx - .selectFrom("undb_email_verification_code") - .selectAll() - .where("user_id", "=", user.id) - .executeTakeFirst() - if (!databaseCode || databaseCode.code !== code) { - return false - } - await tx.deleteFrom("undb_email_verification_code").where("id", "=", databaseCode.id).execute() + return this.txContext + .getCurrentTransaction() + .transaction() + .execute(async (tx) => { + const databaseCode = await tx + .selectFrom("undb_email_verification_code") + .selectAll() + .where("user_id", "=", user.id) + .executeTakeFirst() + if (!databaseCode || databaseCode.code !== code) { + return false + } + await tx.deleteFrom("undb_email_verification_code").where("id", "=", databaseCode.id).execute() - if (!isWithinExpirationDate(new Date(databaseCode.expires_at))) { - return false - } - if (databaseCode.email !== user.email) { - return false - } - return true - }) + if (!isWithinExpirationDate(new Date(databaseCode.expires_at))) { + return false + } + if (databaseCode.email !== user.email) { + return false + } + return true + }) } async #createPasswordResetToken(userId: string): Promise { - const db = getCurrentTransaction() ?? this.queryBuilder + const db = this.txContext.getCurrentTransaction() await db.deleteFrom("undb_password_reset_token").where("user_id", "=", userId).execute() const tokenId = generateIdFromEntropySize(25) // 40 character const tokenHash = encodeHex(await sha256(new TextEncoder().encode(tokenId))) @@ -206,8 +210,9 @@ export class Auth { }, }) - await withTransaction(this.queryBuilder)(async () => { - await getCurrentTransaction() + await this.txContext.withTransaction(async () => { + await this.txContext + .getCurrentTransaction() .insertInto("undb_user") .values({ email: adminEmail, @@ -326,8 +331,9 @@ export class Auth { }, }) - await withTransaction(this.queryBuilder)(async () => { - await getCurrentTransaction() + await this.txContext.withTransaction(async () => { + await this.txContext + .getCurrentTransaction() .insertInto("undb_user") .values({ email, @@ -470,9 +476,9 @@ export class Auth { .post( "/api/reset-password", async (ctx) => { - return withTransaction(this.queryBuilder)(async () => { + return this.txContext.withTransaction(async () => { const email = ctx.body.email - const tx = getCurrentTransaction() ?? this.queryBuilder + const tx = this.txContext.getCurrentTransaction() const user = await tx.selectFrom("undb_user").selectAll().where("email", "=", email).executeTakeFirst() if (!user) { return new Response(null, { @@ -505,8 +511,8 @@ export class Auth { .post( "/api/reset-password/:token", async (ctx) => { - return withTransaction(this.queryBuilder)(async () => { - const tx = getCurrentTransaction() ?? this.queryBuilder + return this.txContext.withTransaction(async () => { + const tx = this.txContext.getCurrentTransaction() const password = ctx.body.password const verificationToken = ctx.params.token @@ -589,7 +595,8 @@ export class Auth { } await this.lucia.invalidateUserSessions(user.id) - await (getCurrentTransaction() ?? this.queryBuilder) + await this.txContext + .getCurrentTransaction() .updateTable("undb_user") .set("email_verified", true) .where("id", "=", user.id) @@ -615,7 +622,7 @@ export class Auth { .get( "/invitation/:invitationId/accept", async (ctx) => { - return withTransaction(this.queryBuilder)(async () => { + return this.txContext.withTransaction(async () => { const { invitationId } = ctx.params await this.commandBus.execute(new AcceptInvitationCommand({ id: invitationId })) diff --git a/apps/backend/src/modules/auth/oauth/github.ts b/apps/backend/src/modules/auth/oauth/github.ts index 822999f48..0afcb3ff1 100644 --- a/apps/backend/src/modules/auth/oauth/github.ts +++ b/apps/backend/src/modules/auth/oauth/github.ts @@ -2,14 +2,13 @@ import { type ISpaceMemberService, injectSpaceMemberService } from "@undb/authz" import { setContextValue } from "@undb/context/server" import { singleton } from "@undb/di" import { createLogger } from "@undb/logger" -import { type IQueryBuilder, getCurrentTransaction, injectQueryBuilder } from "@undb/persistence/server" +import { type IQueryBuilder, type ITxContext, injectQueryBuilder, injectTxCTX } from "@undb/persistence/server" import { type ISpaceService, injectSpaceService } from "@undb/space" import { GitHub } from "arctic" import { Elysia } from "elysia" import { type Lucia, generateIdFromEntropySize } from "lucia" import { serializeCookie } from "oslo/cookie" import { OAuth2RequestError, generateState } from "oslo/oauth2" -import { withTransaction } from "../../../db" import { injectLucia } from "../auth.provider" import { injectGithubProvider } from "./github.provider" @@ -26,6 +25,8 @@ export class GithubOAuth { private readonly github: GitHub, @injectLucia() private readonly lucia: Lucia, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} private logger = createLogger(GithubOAuth.name) @@ -34,7 +35,7 @@ export class GithubOAuth { return new Elysia() .get("/login/github", async (ctx) => { const state = generateState() - const url = await this.github.createAuthorizationURL(state, { scopes: ["user:email"] }) + const url = this.github.createAuthorizationURL(state, ["user:email"]) return new Response(null, { status: 302, headers: { @@ -143,8 +144,8 @@ export class GithubOAuth { }) } const userId = generateIdFromEntropySize(10) // 16 characters long - const space = await withTransaction(this.queryBuilder)(async () => { - const tx = getCurrentTransaction() + const space = await this.txContext.withTransaction(async () => { + const tx = this.txContext.getCurrentTransaction() await tx .insertInto("undb_user") .values({ diff --git a/apps/backend/src/modules/auth/oauth/google.ts b/apps/backend/src/modules/auth/oauth/google.ts index 975d6b967..4a2879324 100644 --- a/apps/backend/src/modules/auth/oauth/google.ts +++ b/apps/backend/src/modules/auth/oauth/google.ts @@ -2,14 +2,13 @@ import { type ISpaceMemberService, injectSpaceMemberService } from "@undb/authz" import { setContextValue } from "@undb/context/server" import { singleton } from "@undb/di" import { createLogger } from "@undb/logger" -import { type IQueryBuilder, getCurrentTransaction, injectQueryBuilder } from "@undb/persistence/server" +import { type IQueryBuilder, type ITxContext, injectQueryBuilder, injectTxCTX } from "@undb/persistence/server" import { type ISpaceService, injectSpaceService } from "@undb/space" import { Google, generateCodeVerifier } from "arctic" import { env } from "bun" import { Elysia } from "elysia" import { type Lucia, generateIdFromEntropySize } from "lucia" import { OAuth2RequestError, generateState } from "oslo/oauth2" -import { withTransaction } from "../../../db" import { injectLucia } from "../auth.provider" import { injectGoogleProvider } from "./google.provider" @@ -26,6 +25,8 @@ export class GoogleOAuth { private readonly google: Google, @injectLucia() private readonly lucia: Lucia, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} private logger = createLogger(GoogleOAuth.name) @@ -35,9 +36,7 @@ export class GoogleOAuth { .get("/login/google", async (ctx) => { const state = generateState() const codeVerifier = generateCodeVerifier() - const url = await this.google.createAuthorizationURL(state, codeVerifier, { - scopes: ["email", "profile"], - }) + const url = this.google.createAuthorizationURL(state, codeVerifier, ["email", "profile"]) ctx.cookie["state"].set({ value: state, @@ -133,8 +132,8 @@ export class GoogleOAuth { }) } const userId = generateIdFromEntropySize(10) // 16 characters long - const space = await withTransaction(this.queryBuilder)(async () => { - const tx = getCurrentTransaction() + const space = await this.txContext.withTransaction(async () => { + const tx = this.txContext.getCurrentTransaction() await tx .insertInto("undb_user") .values({ diff --git a/apps/backend/src/modules/openapi/record.openapi.ts b/apps/backend/src/modules/openapi/record.openapi.ts index 25af3eadc..9b661c3b1 100644 --- a/apps/backend/src/modules/openapi/record.openapi.ts +++ b/apps/backend/src/modules/openapi/record.openapi.ts @@ -13,7 +13,8 @@ import { import { CommandBus, QueryBus } from "@undb/cqrs" import { inject, singleton } from "@undb/di" import { Option, type ICommandBus, type IQueryBus, type PaginatedDTO } from "@undb/domain" -import { injectQueryBuilder, type IQueryBuilder } from "@undb/persistence/server" +import type { ITxContext } from "@undb/persistence/server" +import { injectQueryBuilder, injectTxCTX, type IQueryBuilder } from "@undb/persistence/server" import { GetAggregatesQuery, GetPivotDataQuery, @@ -22,7 +23,6 @@ import { } from "@undb/queries" import { RecordDO, type IRecordReadableValueDTO } from "@undb/table" import Elysia, { t } from "elysia" -import { withTransaction } from "../../db" @singleton() export class RecordOpenApi { @@ -34,6 +34,8 @@ export class RecordOpenApi { private readonly commandBus: ICommandBus, @injectQueryBuilder() private readonly qb: IQueryBuilder, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} public route() { @@ -184,7 +186,7 @@ export class RecordOpenApi { async (ctx) => { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute(new CreateRecordCommand({ baseName, tableName, values: ctx.body.values })), ) }, @@ -203,7 +205,7 @@ export class RecordOpenApi { async (ctx) => { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute(new CreateRecordsCommand({ baseName, tableName, records: ctx.body.records })), ) }, @@ -222,7 +224,7 @@ export class RecordOpenApi { async (ctx) => { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute( new UpdateRecordCommand({ tableName, @@ -248,7 +250,7 @@ export class RecordOpenApi { async (ctx) => { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute( new BulkUpdateRecordsCommand({ tableName, @@ -278,7 +280,7 @@ export class RecordOpenApi { async (ctx) => { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute(new DuplicateRecordCommand({ baseName, tableName, id: ctx.params.recordId })), ) }, @@ -298,7 +300,7 @@ export class RecordOpenApi { const tableName = decodeURIComponent(ctx.params.tableName) const recordId = ctx.params.recordId const field = ctx.params.field - return withTransaction(this.qb)(async () => { + return this.txContext.withTransaction(async () => { const result = (await this.commandBus.execute( new TriggerRecordButtonCommand({ baseName, tableName, recordId, field }), )) as Option @@ -326,7 +328,7 @@ export class RecordOpenApi { async (ctx) => { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute( new BulkDuplicateRecordsCommand({ baseName, @@ -352,7 +354,7 @@ export class RecordOpenApi { async (ctx) => { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute(new DeleteRecordCommand({ baseName, tableName, id: ctx.params.recordId })), ) }, @@ -370,7 +372,7 @@ export class RecordOpenApi { async (ctx) => { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute( new BulkDeleteRecordsCommand({ baseName, @@ -397,7 +399,7 @@ export class RecordOpenApi { const baseName = decodeURIComponent(ctx.params.baseName) const tableName = decodeURIComponent(ctx.params.tableName) const formName = decodeURIComponent(ctx.params.formName) - return withTransaction(this.qb)(() => + return this.txContext.withTransaction(() => this.commandBus.execute( new SubmitFormCommand({ baseName, tableName, form: formName, values: ctx.body.values }), ), diff --git a/apps/backend/src/modules/space/space.module.ts b/apps/backend/src/modules/space/space.module.ts index e9c8500e8..3737fc4cc 100644 --- a/apps/backend/src/modules/space/space.module.ts +++ b/apps/backend/src/modules/space/space.module.ts @@ -4,11 +4,11 @@ import { type IContext, injectContext } from "@undb/context" import { getCurrentMember } from "@undb/context/server" import { CommandBus } from "@undb/cqrs" import { inject, singleton } from "@undb/di" -import { injectQueryBuilder, type IQueryBuilder } from "@undb/persistence/server" +import type { ITxContext } from "@undb/persistence/server" +import { injectQueryBuilder, injectTxCTX, type IQueryBuilder } from "@undb/persistence/server" import { injectSpaceService, type ISpaceService } from "@undb/space" import Elysia, { t } from "elysia" import { type Lucia } from "lucia" -import { withTransaction } from "../../db" import { injectLucia } from "../auth/auth.provider" @singleton() @@ -24,6 +24,8 @@ export class SpaceModule { private readonly qb: IQueryBuilder, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} public route() { return new Elysia() @@ -66,7 +68,7 @@ export class SpaceModule { .delete( "/api/space", async (ctx) => { - return withTransaction(this.qb)(async () => { + return this.txContext.withTransaction(async () => { await this.commandBus.execute(new DeleteSpaceCommand({})) const userId = this.context.mustGetCurrentUserId() diff --git a/apps/backend/src/registry/db.registry.ts b/apps/backend/src/registry/db.registry.ts index 0b970b514..f4c863ac3 100644 --- a/apps/backend/src/registry/db.registry.ts +++ b/apps/backend/src/registry/db.registry.ts @@ -27,6 +27,7 @@ import { createSqliteQueryBuilder, createTursoClient, createTursoQueryBuilder, + CTX, DashboardOutboxService, DashboardQueryRepository, DashboardRepository, @@ -47,6 +48,9 @@ import { TableQueryRepository, TableRepository, TemplateQueryRepository, + TX_CTX, + TxContext, + TxContextImpl, UserQueryRepository, UserRepository, WebhookQueryRepository, @@ -66,8 +70,14 @@ import { TEMPLATE_QUERY_REPOSITORY } from "@undb/template" import { USER_QUERY_REPOSITORY, USER_REPOSITORY, USER_SERVICE, UserService } from "@undb/user" import { WEBHOOK_QUERY_REPOSITORY, WEBHOOK_REPOSITORY } from "@undb/webhook" import Database from "bun:sqlite" +import { AsyncLocalStorage } from "node:async_hooks" + +const txContext = new AsyncLocalStorage() export const registerDb = () => { + container.register(CTX, { useValue: txContext }) + container.register(TX_CTX, TxContextImpl) + container.register(SQLITE_CLIENT, { useFactory: instanceCachingFactory(() => { if (env.UNDB_DB_PROVIDER === "sqlite" || !env.UNDB_DB_PROVIDER) { diff --git a/packages/persistence/src/api-token/api-token.repository.ts b/packages/persistence/src/api-token/api-token.repository.ts index 78717b642..1f9a4fd27 100644 --- a/packages/persistence/src/api-token/api-token.repository.ts +++ b/packages/persistence/src/api-token/api-token.repository.ts @@ -1,6 +1,7 @@ import { singleton } from "@undb/di" import type { ApiTokenDo, IApiTokenRepository } from "@undb/openapi" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" @@ -9,9 +10,12 @@ export class ApiTokenRepository implements IApiTokenRepository { constructor( @injectQueryBuilder() private readonly qb: IQueryBuilder, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async insert(token: ApiTokenDo): Promise { - await (getCurrentTransaction() ?? this.qb) + await this.txContext + .getCurrentTransaction() .insertInto("undb_api_token") .values({ id: token.id.value, @@ -24,7 +28,8 @@ export class ApiTokenRepository implements IApiTokenRepository { } async deleteOneById(id: string): Promise { - await (getCurrentTransaction() ?? this.qb) + await this.txContext + .getCurrentTransaction() .deleteFrom("undb_api_token") .where("undb_api_token.id", "=", id) .execute() diff --git a/packages/persistence/src/base/base.outbox-service.ts b/packages/persistence/src/base/base.outbox-service.ts index b34e51d76..b006ad7e2 100644 --- a/packages/persistence/src/base/base.outbox-service.ts +++ b/packages/persistence/src/base/base.outbox-service.ts @@ -1,7 +1,8 @@ import type { Base, IBaseOutboxService } from "@undb/base" import { injectContext, type IContext } from "@undb/context" import { singleton } from "@undb/di" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import { OutboxMapper } from "../outbox.mapper" @singleton() @@ -9,11 +10,13 @@ export class BaseOutboxService implements IBaseOutboxService { constructor( @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async save(r: Base): Promise { const values = r.domainEvents.map((e) => OutboxMapper.fromEvent(e, this.context)) if (!values.length) return - await getCurrentTransaction().insertInto("undb_outbox").values(values).execute() + await this.txContext.getCurrentTransaction().insertInto("undb_outbox").values(values).execute() r.removeEvents(r.domainEvents) } @@ -21,7 +24,7 @@ export class BaseOutboxService implements IBaseOutboxService { const values = d.flatMap((r) => r.domainEvents.map((e) => OutboxMapper.fromEvent(e, this.context))) if (!values.length) return - await getCurrentTransaction().insertInto("undb_outbox").values(values).execute() + await this.txContext.getCurrentTransaction().insertInto("undb_outbox").values(values).execute() for (const r of d) { r.removeEvents(r.domainEvents) } diff --git a/packages/persistence/src/base/base.repository.ts b/packages/persistence/src/base/base.repository.ts index fd82435cc..1109beb21 100644 --- a/packages/persistence/src/base/base.repository.ts +++ b/packages/persistence/src/base/base.repository.ts @@ -12,7 +12,8 @@ import { executionContext } from "@undb/context/server" import { inject, singleton } from "@undb/di" import { None, Some, type Option } from "@undb/domain" import { injectTableRepository, TableBaseIdSpecification, type ITableRepository } from "@undb/table" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { UnderlyingTableService } from "../underlying/underlying-table.service" @@ -35,10 +36,12 @@ export class BaseRepository implements IBaseRepository { private readonly underlyingTableService: UnderlyingTableService, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async find(spec: IBaseSpecification): Promise { - const tx = getCurrentTransaction() ?? this.qb + const tx = this.txContext.getCurrentTransaction() const bases = await tx .selectFrom("undb_base") .selectAll() @@ -52,7 +55,8 @@ export class BaseRepository implements IBaseRepository { return bases.map((base) => this.mapper.toDo(base)) } async findOne(spec: IBaseSpecification): Promise> { - const base = await (getCurrentTransaction() ?? this.qb) + const base = await this.txContext + .getCurrentTransaction() .selectFrom("undb_base") .selectAll() .where((eb) => { @@ -68,7 +72,8 @@ export class BaseRepository implements IBaseRepository { const spaceId = this.context.mustGetCurrentSpaceId() const spec = WithBaseId.fromString(id).and(new WithBaseSpaceId(spaceId)) - const base = await (getCurrentTransaction() ?? this.qb) + const base = await this.txContext + .getCurrentTransaction() .selectFrom("undb_base") .selectAll() .where((eb) => { @@ -84,7 +89,8 @@ export class BaseRepository implements IBaseRepository { const user = executionContext.getStore()?.user?.userId! const values = this.mapper.toEntity(base) - await getCurrentTransaction() + await this.txContext + .getCurrentTransaction() .insertInto("undb_base") .values({ ...values, @@ -103,7 +109,8 @@ export class BaseRepository implements IBaseRepository { const visitor = new BaseMutateVisitor() spec.accept(visitor) - await getCurrentTransaction() + await this.txContext + .getCurrentTransaction() .updateTable("undb_base") .set({ ...visitor.data, updated_by: userId, updated_at: new Date().toISOString() }) .where((eb) => eb.eb("id", "=", base.id.value)) @@ -112,7 +119,7 @@ export class BaseRepository implements IBaseRepository { } async deleteOneById(id: string): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() const tables = await this.tableRepository.find(Some(new TableBaseIdSpecification(id))) const tableIds = tables.map((t) => t.id.value) diff --git a/packages/persistence/src/ctx.interface.ts b/packages/persistence/src/ctx.interface.ts new file mode 100644 index 000000000..aba8b3d9c --- /dev/null +++ b/packages/persistence/src/ctx.interface.ts @@ -0,0 +1,8 @@ +import type { AnonymousTx, Tx } from "./qb.type" + +export interface ITxContext { + withTransaction: (callback: () => Promise) => Promise + startTransaction: (tx: any) => void + getCurrentTransaction: () => Tx + getAnonymousTransaction: () => AnonymousTx +} diff --git a/packages/persistence/src/ctx.provider.ts b/packages/persistence/src/ctx.provider.ts new file mode 100644 index 000000000..a83c551ad --- /dev/null +++ b/packages/persistence/src/ctx.provider.ts @@ -0,0 +1,4 @@ +import { inject } from "@undb/di" + +export const TX_CTX = Symbol("tx_ctx") +export const injectTxCTX = () => inject(TX_CTX) diff --git a/packages/persistence/src/ctx.ts b/packages/persistence/src/ctx.ts index ceabbfb1e..10d74b063 100644 --- a/packages/persistence/src/ctx.ts +++ b/packages/persistence/src/ctx.ts @@ -1,20 +1,49 @@ +import { inject, singleton } from "@undb/di" import { AsyncLocalStorage } from "node:async_hooks" -import type { AnonymousTx, Tx } from "./qb" +import type { ITxContext } from "./ctx.interface" +import type { IQueryBuilder } from "./qb" +import { injectQueryBuilder } from "./qb.provider" +import type { AnonymousTx, Tx } from "./qb.type" export interface TxContext { trx: Tx | AnonymousTx } -export const txContext = new AsyncLocalStorage() +export const CTX = Symbol("ctx") +export const injectContext = () => inject(CTX) -export function startTransaction(tx: any) { - txContext.enterWith({ trx: tx }) -} +@singleton() +export class TxContextImpl implements ITxContext { + constructor( + @injectQueryBuilder() + private readonly qb: IQueryBuilder, + @injectContext() + private readonly context: AsyncLocalStorage, + ) {} -export function getCurrentTransaction() { - return txContext.getStore()?.trx as Tx -} + withTransaction(callback: () => Promise): Promise { + return this.qb.transaction().execute(async (trx) => { + return new Promise(async (resolve, reject) => { + this.startTransaction(trx) + try { + const result = await callback() + resolve(result) + } catch (error) { + reject(error) + } + }) + }) + } + + startTransaction(tx: any) { + this.context.enterWith({ trx: tx }) + } + + getCurrentTransaction() { + return (this.context.getStore()?.trx ?? this.qb) as Tx + } -export function getAnonymousTransaction() { - return txContext.getStore()?.trx as AnonymousTx + getAnonymousTransaction() { + return (this.context.getStore()?.trx ?? this.qb) as AnonymousTx + } } diff --git a/packages/persistence/src/dashboard/dashboard.outbox-service.ts b/packages/persistence/src/dashboard/dashboard.outbox-service.ts index b37581d7f..7766e0a29 100644 --- a/packages/persistence/src/dashboard/dashboard.outbox-service.ts +++ b/packages/persistence/src/dashboard/dashboard.outbox-service.ts @@ -1,7 +1,8 @@ import { injectContext, type IContext } from "@undb/context" import type { Dashboard, IDashboardOutboxService } from "@undb/dashboard" import { singleton } from "@undb/di" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import { OutboxMapper } from "../outbox.mapper" @singleton() @@ -9,11 +10,13 @@ export class DashboardOutboxService implements IDashboardOutboxService { constructor( @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async save(r: Dashboard): Promise { const values = r.domainEvents.map((e) => OutboxMapper.fromEvent(e, this.context)) if (!values.length) return - await getCurrentTransaction().insertInto("undb_outbox").values(values).execute() + await this.txContext.getCurrentTransaction().insertInto("undb_outbox").values(values).execute() r.removeEvents(r.domainEvents) } @@ -21,7 +24,7 @@ export class DashboardOutboxService implements IDashboardOutboxService { const values = d.flatMap((r) => r.domainEvents.map((e) => OutboxMapper.fromEvent(e, this.context))) if (!values.length) return - await getCurrentTransaction().insertInto("undb_outbox").values(values).execute() + await this.txContext.getCurrentTransaction().insertInto("undb_outbox").values(values).execute() for (const r of d) { r.removeEvents(r.domainEvents) } diff --git a/packages/persistence/src/dashboard/dashboard.query-repository.ts b/packages/persistence/src/dashboard/dashboard.query-repository.ts index 6b89517c9..269f111ab 100644 --- a/packages/persistence/src/dashboard/dashboard.query-repository.ts +++ b/packages/persistence/src/dashboard/dashboard.query-repository.ts @@ -6,7 +6,8 @@ import { } from "@undb/dashboard" import { inject, singleton } from "@undb/di" import { None, Some, type Option } from "@undb/domain" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { DashboardFilterVisitor } from "./dashboard.filter-visitor" @@ -20,10 +21,12 @@ export class DashboardQueryRepository implements IDashboardQueryRepository { private readonly mapper: DashboardMapper, @injectQueryBuilder() private readonly qb: IQueryBuilder, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async find(spec: Option): Promise { - const qb = getCurrentTransaction() ?? this.qb + const qb = this.txContext.getCurrentTransaction() const dashboards = await qb .selectFrom("undb_dashboard") .selectAll() @@ -43,7 +46,7 @@ export class DashboardQueryRepository implements IDashboardQueryRepository { async findOneById(id: string): Promise> { const spec = WithDashboardId.fromString(id) - const qb = getCurrentTransaction() ?? this.qb + const qb = this.txContext.getCurrentTransaction() const dashboard = await this.qb .selectFrom("undb_dashboard") .selectAll() diff --git a/packages/persistence/src/dashboard/dashboard.repository.ts b/packages/persistence/src/dashboard/dashboard.repository.ts index 269502137..f24cc52b8 100644 --- a/packages/persistence/src/dashboard/dashboard.repository.ts +++ b/packages/persistence/src/dashboard/dashboard.repository.ts @@ -10,7 +10,8 @@ import { } from "@undb/dashboard" import { inject, singleton } from "@undb/di" import { None, Some, type Option } from "@undb/domain" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { DashboardFilterVisitor } from "./dashboard.filter-visitor" @@ -29,10 +30,12 @@ export class DashboardRepository implements IDashboardRepository { private readonly qb: IQueryBuilder, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async find(spec: IDashboardSpecification): Promise { - const tx = getCurrentTransaction() ?? this.qb + const tx = this.txContext.getCurrentTransaction() const dashboards = await tx .selectFrom("undb_dashboard") .selectAll() @@ -47,7 +50,7 @@ export class DashboardRepository implements IDashboardRepository { return dashboards.map((dashboard) => this.mapper.toDo(dashboard)) } async findOne(spec: IDashboardSpecification): Promise> { - const tx = getCurrentTransaction() ?? this.qb + const tx = this.txContext.getCurrentTransaction() const dashboard = await tx .selectFrom("undb_dashboard") .selectAll() @@ -65,7 +68,7 @@ export class DashboardRepository implements IDashboardRepository { const spaceId = this.context.mustGetCurrentSpaceId() const spec = WithDashboardId.fromString(id).and(new WithDashboardSpaceId(spaceId)) - const tx = getCurrentTransaction() ?? this.qb + const tx = this.txContext.getCurrentTransaction() const dashboard = await tx .selectFrom("undb_dashboard") .selectAll() @@ -83,7 +86,7 @@ export class DashboardRepository implements IDashboardRepository { const user = this.context.mustGetCurrentUserId() const values = this.mapper.toEntity(dashboard) - const qb = getCurrentTransaction() ?? this.qb + const qb = this.txContext.getCurrentTransaction() await qb .insertInto("undb_dashboard") .values({ @@ -130,7 +133,7 @@ export class DashboardRepository implements IDashboardRepository { } async deleteOneById(id: string): Promise { - const qb = getCurrentTransaction() ?? this.qb + const qb = this.txContext.getCurrentTransaction() await qb .deleteFrom("undb_dashboard_table_id_mapping") diff --git a/packages/persistence/src/member/invitation.query-repository.ts b/packages/persistence/src/member/invitation.query-repository.ts index 5b128ebab..a9162b0e4 100644 --- a/packages/persistence/src/member/invitation.query-repository.ts +++ b/packages/persistence/src/member/invitation.query-repository.ts @@ -2,7 +2,8 @@ import type { IInvitationQueryRepository, InvitationCompositeSpecification, Invi import { injectContext, type IContext } from "@undb/context" import { singleton } from "@undb/di" import { None, Some, type Option } from "@undb/domain" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { InvitationFilterVisitor } from "./invitation.filter-visitor" @@ -14,9 +15,12 @@ export class InvitationQueryRepository implements IInvitationQueryRepository { private readonly qb: IQueryBuilder, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async findOneById(id: string): Promise> { - const invitation = await (getCurrentTransaction() ?? this.qb) + const invitation = await this.txContext + .getCurrentTransaction() .selectFrom("undb_invitation") .selectAll() .where("id", "=", id) @@ -36,7 +40,8 @@ export class InvitationQueryRepository implements IInvitationQueryRepository { } async findOne(spec: InvitationCompositeSpecification): Promise> { - const invitation = await (getCurrentTransaction() ?? this.qb) + const invitation = await this.txContext + .getCurrentTransaction() .selectFrom("undb_invitation") .selectAll() .where((eb) => { @@ -61,7 +66,8 @@ export class InvitationQueryRepository implements IInvitationQueryRepository { } async find(spec: Option): Promise { - const invitations = await (getCurrentTransaction() ?? this.qb) + const invitations = await this.txContext + .getCurrentTransaction() .selectFrom("undb_invitation") .selectAll() .where((eb) => { diff --git a/packages/persistence/src/member/invitation.repository.ts b/packages/persistence/src/member/invitation.repository.ts index 4a92bfdb2..61ac7d356 100644 --- a/packages/persistence/src/member/invitation.repository.ts +++ b/packages/persistence/src/member/invitation.repository.ts @@ -1,7 +1,8 @@ import type { IInvitationRepository, InvitationCompositeSpecification, InvitationDo } from "@undb/authz" import { injectContext, type IContext } from "@undb/context" import { singleton } from "@undb/di" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import { InvitationMutationVisitor } from "./invitation.mutation-visitor" @singleton() @@ -9,15 +10,17 @@ export class InvitationRepository implements IInvitationRepository { constructor( @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async deleteOneById(id: string): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() await trx.deleteFrom("undb_invitation").where("id", "=", id).execute() } async updateOneById(id: string, spec: InvitationCompositeSpecification): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() await trx .updateTable("undb_invitation") @@ -31,7 +34,7 @@ export class InvitationRepository implements IInvitationRepository { } async upsert(invitation: InvitationDo): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() await trx .insertInto("undb_invitation") @@ -55,7 +58,7 @@ export class InvitationRepository implements IInvitationRepository { } async insert(invitation: InvitationDo): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() await trx .insertInto("undb_invitation") diff --git a/packages/persistence/src/member/space-member.repository.ts b/packages/persistence/src/member/space-member.repository.ts index 43a8b9429..34850d718 100644 --- a/packages/persistence/src/member/space-member.repository.ts +++ b/packages/persistence/src/member/space-member.repository.ts @@ -1,7 +1,8 @@ -import { SpaceMember, SpaceMemberComositeSpecification, type ISpaceMemberRepository } from "@undb/authz" +import { SpaceMember,SpaceMemberComositeSpecification,type ISpaceMemberRepository } from "@undb/authz" import { singleton } from "@undb/di" -import { None, Some, type Option } from "@undb/domain" -import { getCurrentTransaction } from "../ctx" +import { None,Some,type Option } from "@undb/domain" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { SpaceMemberFilterVisitor } from "./space-member.filter-visitor" @@ -11,10 +12,12 @@ export class SpaceMemberRepository implements ISpaceMemberRepository { constructor( @injectQueryBuilder() private readonly qb: IQueryBuilder, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async exists(spec: SpaceMemberComositeSpecification): Promise { - const user = await (getCurrentTransaction() ?? this.qb) + const user = await this.txContext.getCurrentTransaction() .selectFrom("undb_space_member") .selectAll() .where((eb) => { @@ -28,7 +31,7 @@ export class SpaceMemberRepository implements ISpaceMemberRepository { } async findOne(spec: SpaceMemberComositeSpecification): Promise> { - const member = await (getCurrentTransaction() ?? this.qb) + const member = await this.txContext.getCurrentTransaction() .selectFrom("undb_space_member") .selectAll() .where((eb) => { @@ -57,7 +60,7 @@ export class SpaceMemberRepository implements ISpaceMemberRepository { } async insert(member: SpaceMember): Promise { const json = member.toJSON() - await getCurrentTransaction() + await this.txContext.getCurrentTransaction() .insertInto("undb_space_member") .values({ id: json.id, diff --git a/packages/persistence/src/qb.ts b/packages/persistence/src/qb.ts index 9ee14a43b..d0e05722e 100644 --- a/packages/persistence/src/qb.ts +++ b/packages/persistence/src/qb.ts @@ -2,7 +2,7 @@ import type { Client } from "@libsql/client" import { LibsqlDialect } from "@libsql/kysely-libsql" import { createLogger } from "@undb/logger" import { Database as SqliteDatabase } from "bun:sqlite" -import { Kysely, ParseJSONResultsPlugin, sql, Transaction, type Dialect, type RawBuilder } from "kysely" +import { Kysely, ParseJSONResultsPlugin, sql, type Dialect, type RawBuilder } from "kysely" import { BunSqliteDialect } from "kysely-bun-sqlite" import { type Database } from "./db" @@ -56,9 +56,6 @@ export function createSqliteQueryBuilder(sqlite: SqliteDatabase) { export type IQueryBuilder = ReturnType export type IRecordQueryBuilder = Kysely -export type Tx = Transaction -export type AnonymousTx = Transaction - export function json(value: T): RawBuilder { return sql`${JSON.stringify(value)}` } diff --git a/packages/persistence/src/qb.type.ts b/packages/persistence/src/qb.type.ts new file mode 100644 index 000000000..73a435d82 --- /dev/null +++ b/packages/persistence/src/qb.type.ts @@ -0,0 +1,5 @@ +import type { Transaction } from "kysely" +import type { Database } from "./db" + +export type Tx = Transaction +export type AnonymousTx = Transaction diff --git a/packages/persistence/src/record/record-query.helper.ts b/packages/persistence/src/record/record-query.helper.ts index 7d0274aa2..48ab42df4 100644 --- a/packages/persistence/src/record/record-query.helper.ts +++ b/packages/persistence/src/record/record-query.helper.ts @@ -3,7 +3,8 @@ import { singleton } from "@undb/di" import type { IPagination, Option } from "@undb/domain" import { FieldIdVo, type Field, type IViewSort, type RecordComositeSpecification, type TableDo } from "@undb/table" import { sql, type ExpressionBuilder, type SelectQueryBuilder } from "kysely" -import { getAnonymousTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IRecordQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { UnderlyingTable } from "../underlying/underlying-table" @@ -21,6 +22,8 @@ export class RecordQueryHelper { public readonly qb: IRecordQueryBuilder, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} createQueryCreator( @@ -29,7 +32,7 @@ export class RecordQueryHelper { visibleFields: Field[], spec: Option, ) { - const trx = getAnonymousTransaction() ?? this.qb + const trx = this.txContext.getAnonymousTransaction() let qb = new RecordQueryCreatorVisitor(trx, table, foreignTables, visibleFields).create() const visitor = new RecordQuerySpecCreatorVisitor(trx, qb, table) diff --git a/packages/persistence/src/record/record.outbox-service.ts b/packages/persistence/src/record/record.outbox-service.ts index 2821ac51c..370ce08b3 100644 --- a/packages/persistence/src/record/record.outbox-service.ts +++ b/packages/persistence/src/record/record.outbox-service.ts @@ -1,7 +1,8 @@ import { injectContext, type IContext } from "@undb/context" import { singleton } from "@undb/di" import type { IRecordOutboxService, RecordDO } from "@undb/table" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import { OutboxMapper } from "../outbox.mapper" @singleton() @@ -9,10 +10,12 @@ export class RecordOutboxService implements IRecordOutboxService { constructor( @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async save(r: RecordDO): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() const values = r.domainEvents.map((e) => OutboxMapper.fromEvent(e, this.context)) if (!values.length) return await trx.insertInto("undb_outbox").values(values).execute() @@ -21,7 +24,7 @@ export class RecordOutboxService implements IRecordOutboxService { async saveMany(d: RecordDO[]): Promise { if (!d.length) return - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() const values = d.flatMap((r) => r.domainEvents.map((e) => OutboxMapper.fromEvent(e, this.context))) await trx.insertInto("undb_outbox").values(values).execute() for (const r of d) { diff --git a/packages/persistence/src/record/record.repository.ts b/packages/persistence/src/record/record.repository.ts index 78aa419d9..fa4057aef 100644 --- a/packages/persistence/src/record/record.repository.ts +++ b/packages/persistence/src/record/record.repository.ts @@ -21,7 +21,8 @@ import { } from "@undb/table" import { chunk } from "es-toolkit/array" import { sql, type CompiledQuery, type ExpressionBuilder } from "kysely" -import { getAnonymousTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import { UnderlyingTable } from "../underlying/underlying-table" import { RecordQueryHelper } from "./record-query.helper" import { getRecordDTOFromEntity } from "./record-utils" @@ -41,6 +42,8 @@ export class RecordRepository implements IRecordRepository { private readonly helper: RecordQueryHelper, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} private async getForeignTables(table: TableDo, fields: Field[]): Promise> { @@ -56,7 +59,7 @@ export class RecordRepository implements IRecordRepository { } async insert(table: TableDo, record: RecordDO): Promise { - const trx = getAnonymousTransaction() + const trx = this.txContext.getAnonymousTransaction() const context = executionContext.getStore() const userId = context?.user?.userId! @@ -88,7 +91,7 @@ export class RecordRepository implements IRecordRepository { } async #bulkInsert(table: TableDo, records: RecordDO[]): Promise { - const trx = getAnonymousTransaction() + const trx = this.txContext.getAnonymousTransaction() const context = executionContext.getStore() const userId = context?.user?.userId! @@ -202,7 +205,7 @@ export class RecordRepository implements IRecordRepository { async updateOneById(table: TableDo, record: RecordDO, spec: Option): Promise { if (spec.isNone()) return - const trx = getAnonymousTransaction() + const trx = this.txContext.getAnonymousTransaction() const context = executionContext.getStore() const userId = context?.user?.userId! @@ -237,7 +240,7 @@ export class RecordRepository implements IRecordRepository { update: RecordComositeSpecification, records: RecordDO[], ): Promise { - const trx = getAnonymousTransaction() + const trx = this.txContext.getAnonymousTransaction() const context = executionContext.getStore() const userId = context?.user?.userId! @@ -297,14 +300,14 @@ export class RecordRepository implements IRecordRepository { async deleteOneById(table: TableDo, record: RecordDO): Promise { const t = new UnderlyingTable(table) - const trx = getAnonymousTransaction() + const trx = this.txContext.getAnonymousTransaction() await trx.deleteFrom(t.name).where(ID_TYPE, "=", record.id.value).executeTakeFirst() await this.outboxService.save(record) } async deleteByIds(table: TableDo, records: RecordDO[]): Promise { const t = new UnderlyingTable(table) - const trx = getAnonymousTransaction() + const trx = this.txContext.getAnonymousTransaction() const ids = records.map((r) => r.id.value) await trx.deleteFrom(t.name).where(ID_TYPE, "in", ids).executeTakeFirst() await this.outboxService.saveMany(records) diff --git a/packages/persistence/src/server.ts b/packages/persistence/src/server.ts index 004e368f9..8a5748360 100644 --- a/packages/persistence/src/server.ts +++ b/packages/persistence/src/server.ts @@ -18,6 +18,8 @@ export * from "./user" export * from "./webhook" export { type Client } from "@libsql/client" +export * from "./ctx.interface" +export * from "./ctx.provider" export { SQLITE_CLIENT, createSqliteClient, createTursoClient, injectSqliteClient } from "./db-client" export { type IQueryBuilder } from "./qb" export { injectQueryBuilder } from "./qb.provider" diff --git a/packages/persistence/src/share/share.repository.ts b/packages/persistence/src/share/share.repository.ts index e2b097ddd..f3fd92435 100644 --- a/packages/persistence/src/share/share.repository.ts +++ b/packages/persistence/src/share/share.repository.ts @@ -1,7 +1,8 @@ import { inject, singleton } from "@undb/di" import { None, Some, type Option } from "@undb/domain" import { WithShareId, type IShareRepository, type Share, type ShareSpecification } from "@undb/share" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { ShareFilterVisitor } from "./share.filter-visitor" @@ -14,6 +15,8 @@ export class ShareRepository implements IShareRepository { private readonly mapper: ShareMapper, @injectQueryBuilder() private readonly qb: IQueryBuilder, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} insert(share: Share): Promise { throw new Error("Method not implemented.") @@ -21,7 +24,8 @@ export class ShareRepository implements IShareRepository { async updateOneById(share: Share, spec: ShareSpecification): Promise { const entity = this.mapper.toEntity(share) - await (getCurrentTransaction() ?? this.qb) + await this.txContext + .getCurrentTransaction() .insertInto("undb_share") .values(entity) .onConflict((ob) => ob.columns(["target_id", "target_type"]).doUpdateSet({ enabled: share.enabled })) @@ -30,7 +34,8 @@ export class ShareRepository implements IShareRepository { async findOneById(id: string): Promise> { const spec = WithShareId.fromString(id) - const share = await (getCurrentTransaction() ?? this.qb) + const share = await this.txContext + .getCurrentTransaction() .selectFrom("undb_share") .selectAll() .where((eb) => { @@ -47,7 +52,8 @@ export class ShareRepository implements IShareRepository { return Some(this.mapper.toDo(share)) } async findOne(spec: ShareSpecification): Promise> { - const share = await (getCurrentTransaction() ?? this.qb) + const share = await this.txContext + .getCurrentTransaction() .selectFrom("undb_share") .selectAll() .where((eb) => { diff --git a/packages/persistence/src/space/space.repository.ts b/packages/persistence/src/space/space.repository.ts index 20e51b9b8..80ae9929b 100644 --- a/packages/persistence/src/space/space.repository.ts +++ b/packages/persistence/src/space/space.repository.ts @@ -2,7 +2,8 @@ import { injectContext, type IContext } from "@undb/context" import { singleton } from "@undb/di" import { None, Some, type Option } from "@undb/domain" import { SpaceFactory, type ISpaceRepository, type ISpaceSpecification, type Space } from "@undb/space" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { SpaceFilterVisitor } from "./space.filter-visitor" @@ -15,9 +16,12 @@ export class SpaceRepostitory implements ISpaceRepository { private readonly qb: IQueryBuilder, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async find(spec: ISpaceSpecification): Promise { - const space = await (getCurrentTransaction() ?? this.qb) + const space = await this.txContext + .getCurrentTransaction() .selectFrom("undb_space") .selectAll() .where((eb) => { @@ -37,7 +41,8 @@ export class SpaceRepostitory implements ISpaceRepository { ) } async findOne(spec: ISpaceSpecification): Promise> { - const space = await (getCurrentTransaction() ?? this.qb) + const space = await this.txContext + .getCurrentTransaction() .selectFrom("undb_space") .selectAll() .where((eb) => new SpaceFilterVisitor(this.qb, eb).exec(Some(spec))) @@ -57,7 +62,8 @@ export class SpaceRepostitory implements ISpaceRepository { ) } async findOneById(id: string): Promise> { - const space = await (getCurrentTransaction() ?? this.qb) + const space = await this.txContext + .getCurrentTransaction() .selectFrom("undb_space") .selectAll() .where("undb_space.id", "=", id) @@ -78,7 +84,7 @@ export class SpaceRepostitory implements ISpaceRepository { ) } async insert(space: Space): Promise { - const tx = getCurrentTransaction() + const tx = this.txContext.getCurrentTransaction() const userId = this.context.getCurrentUserId() await tx .insertInto("undb_space") @@ -99,16 +105,16 @@ export class SpaceRepostitory implements ISpaceRepository { spec.accept(visitor) const userId = this.context.getCurrentUserId() - await getCurrentTransaction() + await this.txContext + .getCurrentTransaction() .updateTable("undb_space") .set({ ...visitor.data, updated_by: userId, updated_at: new Date().toISOString() }) .where((eb) => eb.and([eb.eb("id", "=", space.id.value), eb.eb("deleted_at", "is", null)])) .execute() } async deleteOneById(id: string): Promise { - const tx = getCurrentTransaction() - - await tx + await this.txContext + .getCurrentTransaction() .updateTable("undb_space") .set({ deleted_at: new Date().getTime(), deleted_by: this.context.getCurrentUserId() }) .where("id", "=", id) diff --git a/packages/persistence/src/table/table.outbox-service.ts b/packages/persistence/src/table/table.outbox-service.ts index 3f6c3b9f5..efb066245 100644 --- a/packages/persistence/src/table/table.outbox-service.ts +++ b/packages/persistence/src/table/table.outbox-service.ts @@ -3,7 +3,8 @@ import { EventBus } from "@undb/cqrs" import { inject, singleton } from "@undb/di" import type { IEventBus } from "@undb/domain" import type { ITableOutboxService, TableDo } from "@undb/table" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import { OutboxMapper } from "../outbox.mapper" @singleton() @@ -13,9 +14,11 @@ export class TableOutboxService implements ITableOutboxService { private readonly context: IContext, @inject(EventBus) private readonly eventBus: IEventBus, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async save(table: TableDo): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() const values = table.domainEvents.map((e) => OutboxMapper.fromEvent(e, this.context)) if (!values.length) return @@ -26,7 +29,7 @@ export class TableOutboxService implements ITableOutboxService { } async saveMany(d: TableDo[]): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() const values = d.flatMap((table) => table.domainEvents.map((e) => OutboxMapper.fromEvent(e, this.context))) if (!values.length) return diff --git a/packages/persistence/src/table/table.repository.ts b/packages/persistence/src/table/table.repository.ts index 492685e8f..68de688bc 100644 --- a/packages/persistence/src/table/table.repository.ts +++ b/packages/persistence/src/table/table.repository.ts @@ -12,7 +12,8 @@ import { type TableDo, type TableId, } from "@undb/table" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { InsertTable, InsertTableIdMapping } from "../db" import { json, type IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" @@ -33,6 +34,8 @@ export class TableRepository implements ITableRepository { private readonly qb: IQueryBuilder, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} get mapper() { @@ -48,7 +51,7 @@ export class TableRepository implements ITableRepository { return } - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() const ctx = executionContext.getStore() const userId = ctx!.user!.userId! @@ -69,7 +72,7 @@ export class TableRepository implements ITableRepository { } async insert(table: TableDo): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() const ctx = executionContext.getStore() const userId = ctx!.user!.userId! @@ -176,7 +179,8 @@ export class TableRepository implements ITableRepository { } async find(spec: Option, ignoreSpace?: boolean): Promise { - const query = (getCurrentTransaction() ?? this.qb) + const query = this.txContext + .getCurrentTransaction() .selectFrom("undb_table") .selectAll("undb_table") .$if(spec.isSome(), (qb) => new TableReferenceVisitor(qb).call(spec.unwrap())) @@ -189,7 +193,8 @@ export class TableRepository implements ITableRepository { } async findOne(spec: Option): Promise> { - const tb = await (getCurrentTransaction() ?? this.qb) + const tb = await this.txContext + .getCurrentTransaction() .selectFrom("undb_table") .selectAll("undb_table") .$if(spec.isSome(), (qb) => new TableReferenceVisitor(qb).call(spec.unwrap())) @@ -205,7 +210,8 @@ export class TableRepository implements ITableRepository { async findOneById(id: TableId): Promise> { const spec = Some(new TableIdSpecification(id)) - const tb = await (getCurrentTransaction() ?? this.qb) + const tb = await this.txContext + .getCurrentTransaction() .selectFrom("undb_table") .selectAll("undb_table") .$call((qb) => new TableReferenceVisitor(qb).call(spec.unwrap())) @@ -217,7 +223,8 @@ export class TableRepository implements ITableRepository { async findManyByIds(ids: TableId[]): Promise { const spec = Some(new TableIdsSpecification(ids)) - const tbs = await (getCurrentTransaction() ?? this.qb) + const tbs = await this.txContext + .getCurrentTransaction() .selectFrom("undb_table") .selectAll("undb_table") .$call((qb) => new TableReferenceVisitor(qb).call(spec.unwrap())) @@ -228,7 +235,7 @@ export class TableRepository implements ITableRepository { } async deleteOneById(table: TableDo): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() await trx .deleteFrom("undb_table_id_mapping") .where((eb) => eb.eb("table_id", "=", table.id.value)) diff --git a/packages/persistence/src/underlying/underlying-table.service.ts b/packages/persistence/src/underlying/underlying-table.service.ts index 818cbb492..8425612d8 100644 --- a/packages/persistence/src/underlying/underlying-table.service.ts +++ b/packages/persistence/src/underlying/underlying-table.service.ts @@ -3,7 +3,8 @@ import { singleton } from "@undb/di" import { createLogger } from "@undb/logger" import type { TableComositeSpecification, TableDo } from "@undb/table" import type { CompiledQuery } from "kysely" -import { getAnonymousTransaction, getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import { JoinTable } from "./reference/join-table" import { UnderlyingTable } from "./underlying-table" import { UnderlyingTableFieldVisitor } from "./underlying-table-field.visitor" @@ -11,13 +12,17 @@ import { UnderlyingTableSpecVisitor } from "./underlying-table-spec.visitor" @singleton() export class UnderlyingTableService { - constructor(@injectContext() private readonly context: IContext) {} + constructor( + @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, + ) {} readonly logger = createLogger(UnderlyingTableService.name) async create(table: TableDo) { const t = new UnderlyingTable(table) - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() const sql: CompiledQuery[] = [] await trx.schema .createTable(t.name) @@ -39,7 +44,7 @@ export class UnderlyingTableService { async update(table: TableDo, spec: TableComositeSpecification) { const t = new UnderlyingTable(table) - const trx = getAnonymousTransaction() + const trx = this.txContext.getAnonymousTransaction() const visitor = new UnderlyingTableSpecVisitor(t, trx, this.context) spec.accept(visitor) @@ -49,7 +54,7 @@ export class UnderlyingTableService { async delete(table: TableDo) { const t = new UnderlyingTable(table) - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() await trx.schema.dropTable(t.name).ifExists().execute() const referenceFields = table.schema.getReferenceFields() for (const field of referenceFields) { diff --git a/packages/persistence/src/user/user.repository.ts b/packages/persistence/src/user/user.repository.ts index cc84dbcf3..b9d767ccc 100644 --- a/packages/persistence/src/user/user.repository.ts +++ b/packages/persistence/src/user/user.repository.ts @@ -1,14 +1,19 @@ import { singleton } from "@undb/di" import type { IUser, IUserRepository } from "@undb/user" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" @singleton() export class UserRepository implements IUserRepository { + constructor( + @injectTxCTX() + private readonly txContext: ITxContext, + ) {} insert(user: IUser): Promise { throw new Error("Method not implemented.") } async updateOneById(userId: string, user: IUser): Promise { - const trx = getCurrentTransaction() + const trx = this.txContext.getCurrentTransaction() await trx.updateTable("undb_user").set({ username: user.username }).where("id", "=", userId).execute() } } diff --git a/packages/persistence/src/webhook/webhook.repository.ts b/packages/persistence/src/webhook/webhook.repository.ts index f1d73b69a..6fafa0583 100644 --- a/packages/persistence/src/webhook/webhook.repository.ts +++ b/packages/persistence/src/webhook/webhook.repository.ts @@ -2,7 +2,8 @@ import { injectContext, type IContext } from "@undb/context" import { inject, singleton } from "@undb/di" import { None, Some, type Option } from "@undb/domain" import { type IWebhookRepository, type WebhookDo, type WebhookSpecification } from "@undb/webhook" -import { getCurrentTransaction } from "../ctx" +import type { ITxContext } from "../ctx.interface" +import { injectTxCTX } from "../ctx.provider" import type { IQueryBuilder } from "../qb" import { injectQueryBuilder } from "../qb.provider" import { WebhookFilterVisitor } from "./webhook.filter-visitor" @@ -18,10 +19,13 @@ export class WebhookRepository implements IWebhookRepository { private readonly qb: IQueryBuilder, @injectContext() private readonly context: IContext, + @injectTxCTX() + private readonly txContext: ITxContext, ) {} async findOneById(id: string): Promise> { - const wb = await (getCurrentTransaction() ?? this.qb) + const wb = await this.txContext + .getCurrentTransaction() .selectFrom("undb_webhook") .selectAll() .where((eb) => eb.eb("id", "=", id)) @@ -35,7 +39,8 @@ export class WebhookRepository implements IWebhookRepository { } async find(spec: WebhookSpecification): Promise { - const wb = await (getCurrentTransaction() ?? this.qb) + const wb = await this.txContext + .getCurrentTransaction() .selectFrom("undb_webhook") .selectAll() .where((eb) => { @@ -51,14 +56,15 @@ export class WebhookRepository implements IWebhookRepository { async insert(webhook: WebhookDo): Promise { const values = this.mapper.toEntity(webhook) - await (getCurrentTransaction() ?? this.qb).insertInto("undb_webhook").values(values).execute() + await this.txContext.getCurrentTransaction().insertInto("undb_webhook").values(values).execute() } async updateOneById(webhook: WebhookDo, spec: WebhookSpecification): Promise { const visitor = new WebhookMutationVisitor() spec.accept(visitor) - await (getCurrentTransaction() ?? this.qb) + await this.txContext + .getCurrentTransaction() .updateTable("undb_webhook") .set(visitor.data) .where((eb) => eb.eb("id", "=", webhook.id.value)) @@ -66,6 +72,6 @@ export class WebhookRepository implements IWebhookRepository { } async deleteOneById(id: string): Promise { - await (getCurrentTransaction() ?? this.qb).deleteFrom("undb_webhook").where("undb_webhook.id", "=", id).execute() + await this.txContext.getCurrentTransaction().deleteFrom("undb_webhook").where("undb_webhook.id", "=", id).execute() } } diff --git a/packages/trpc/src/trpc.ts b/packages/trpc/src/trpc.ts index d1c48121a..77045c0fe 100644 --- a/packages/trpc/src/trpc.ts +++ b/packages/trpc/src/trpc.ts @@ -4,7 +4,7 @@ import { initTRPC, TRPCError } from "@trpc/server" import { executionContext, getCurrentUserId } from "@undb/context/server" import { container } from "@undb/di" import { createLogger } from "@undb/logger" -import { QUERY_BUILDER, startTransaction, type IQueryBuilder } from "@undb/persistence/server" +import { QUERY_BUILDER, TX_CTX, type IQueryBuilder, type ITxContext } from "@undb/persistence/server" import { ZodError } from "@undb/zod" import { fromZodError } from "zod-validation-error" import pkg from "../package.json" @@ -51,11 +51,12 @@ export const p = t.procedure .use(async (ctx) => { if (ctx.type === "mutation") { const qb = container.resolve(QUERY_BUILDER) + const txContext = container.resolve(TX_CTX) return await qb .transaction() .setIsolationLevel("read committed") .execute(async (tx) => { - startTransaction(tx) + txContext.startTransaction(tx) const result = await ctx.next() return result