Skip to content

Commit b6bd618

Browse files
committed
Addec connection factory and extended transaction with connection information
1 parent 87c058e commit b6bd618

File tree

7 files changed

+144
-57
lines changed

7 files changed

+144
-57
lines changed

src/packages/dumbo/src/core/connections/connection.ts

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
1-
import type { WithSQLExecutor } from '../execute';
2-
import type { DatabaseTransactionFactory } from './transaction';
1+
import {
2+
sqlExecutor,
3+
type DbSQLExecutor,
4+
type WithSQLExecutor,
5+
} from '../execute';
6+
import {
7+
transactionFactoryWithDbClient,
8+
type DatabaseTransaction,
9+
type DatabaseTransactionFactory,
10+
} from './transaction';
311

412
export interface Connection<
513
ConnectorType extends string = string,
@@ -20,3 +28,59 @@ export interface ConnectionFactory<
2028
handle: (connection: ConnectionType) => Promise<Result>,
2129
) => Promise<Result>;
2230
}
31+
32+
export type CreateConnectionOptions<
33+
ConnectorType extends string = string,
34+
DbClient = unknown,
35+
ConnectionType extends Connection<ConnectorType, DbClient> = Connection<
36+
ConnectorType,
37+
DbClient
38+
>,
39+
Executor extends DbSQLExecutor = DbSQLExecutor,
40+
> = {
41+
type: ConnectorType;
42+
connect: Promise<DbClient>;
43+
close: (client: DbClient) => Promise<void>;
44+
initTransaction: (
45+
connection: () => ConnectionType,
46+
) => (client: Promise<DbClient>) => DatabaseTransaction<ConnectorType>;
47+
executor: () => Executor;
48+
};
49+
50+
export const createConnection = <
51+
ConnectorType extends string = string,
52+
DbClient = unknown,
53+
ConnectionType extends Connection<ConnectorType, DbClient> = Connection<
54+
ConnectorType,
55+
DbClient
56+
>,
57+
Executor extends DbSQLExecutor = DbSQLExecutor,
58+
>(
59+
options: CreateConnectionOptions<
60+
ConnectorType,
61+
DbClient,
62+
ConnectionType,
63+
Executor
64+
>,
65+
): ConnectionType => {
66+
const { type, connect, close, initTransaction, executor } = options;
67+
68+
let client: DbClient | null = null;
69+
70+
const getClient = async () => client ?? (client = await connect);
71+
72+
const connection: Connection<ConnectorType, DbClient> = {
73+
type: type,
74+
open: getClient,
75+
close: () => (client ? close(client) : Promise.resolve()),
76+
...transactionFactoryWithDbClient(
77+
getClient,
78+
initTransaction(() => typedConnection),
79+
),
80+
execute: sqlExecutor(executor(), { connect: getClient }),
81+
};
82+
83+
const typedConnection = connection as ConnectionType;
84+
85+
return typedConnection;
86+
};

src/packages/dumbo/src/core/connections/transaction.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import type { WithSQLExecutor } from '../execute';
22
import { type Connection } from './connection';
33

4-
export interface DatabaseTransaction<ConnectorType extends string = string>
5-
extends WithSQLExecutor {
4+
export interface DatabaseTransaction<
5+
ConnectorType extends string = string,
6+
DbClient = unknown,
7+
> extends WithSQLExecutor {
68
type: ConnectorType;
9+
connection: Connection<ConnectorType, DbClient>;
710
begin: () => Promise<void>;
811
commit: () => Promise<void>;
912
rollback: (error?: unknown) => Promise<void>;

src/packages/dumbo/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ export type PoolOptions = {
1414
connector?: ConnectorType;
1515
};
1616

17-
export type DumboOptions = PoolOptions;
17+
export type DumboOptions = PoolOptions & PostgresPoolOptions;
1818
export type Dumbo = PostgresPool;
1919

20-
export const connectionPool = <PoolOptionsType extends PoolOptions>(
20+
export const connectionPool = <PoolOptionsType extends DumboOptions>(
2121
options: PoolOptionsType,
2222
) =>
2323
// TODO: this should have the pattern matching and verification

src/packages/dumbo/src/postgres/pg/connections/connection.ts

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
import pg from 'pg';
2-
import {
3-
sqlExecutor,
4-
transactionFactoryWithDbClient,
5-
type Connection,
6-
} from '../../../core';
2+
import { createConnection, type Connection } from '../../../core';
73
import { nodePostgresSQLExecutor } from '../execute';
84
import { nodePostgresTransaction } from './transaction';
95

@@ -45,35 +41,27 @@ export const nodePostgresClientConnection = (
4541
): NodePostgresClientConnection => {
4642
const { connect, close } = options;
4743

48-
let client: pg.Client | null = null;
49-
50-
const getClient = async () => client ?? (client = await connect);
51-
52-
return {
44+
return createConnection({
5345
type: NodePostgresConnectorType,
54-
open: getClient,
55-
close: () => (client ? close(client) : Promise.resolve()),
56-
...transactionFactoryWithDbClient(getClient, nodePostgresTransaction),
57-
execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: getClient }),
58-
};
46+
connect,
47+
close,
48+
initTransaction: (connection) => nodePostgresTransaction(connection),
49+
executor: nodePostgresSQLExecutor,
50+
});
5951
};
6052

6153
export const nodePostgresPoolClientConnection = (
6254
options: NodePostgresPoolClientOptions,
6355
): NodePostgresPoolClientConnection => {
6456
const { connect, close } = options;
6557

66-
let client: pg.PoolClient | null = null;
67-
68-
const getClient = async () => client ?? (client = await connect);
69-
70-
return {
58+
return createConnection({
7159
type: NodePostgresConnectorType,
72-
open: getClient,
73-
close: () => (client ? close(client) : Promise.resolve()),
74-
...transactionFactoryWithDbClient(getClient, nodePostgresTransaction),
75-
execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: getClient }),
76-
};
60+
connect,
61+
close,
62+
initTransaction: (connection) => nodePostgresTransaction(connection),
63+
executor: nodePostgresSQLExecutor,
64+
});
7765
};
7866

7967
export function nodePostgresConnection(
Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import { sqlExecutor, type DatabaseTransaction } from '../../../core';
1+
import {
2+
sqlExecutor,
3+
type Connection,
4+
type DatabaseTransaction,
5+
} from '../../../core';
26
import { nodePostgresSQLExecutor } from '../execute';
37
import {
48
NodePostgresConnectorType,
@@ -9,29 +13,34 @@ import {
913
export type NodePostgresTransaction =
1014
DatabaseTransaction<NodePostgresConnector>;
1115

12-
export const nodePostgresTransaction = <
13-
DbClient extends NodePostgresPoolOrClient = NodePostgresPoolOrClient,
14-
>(
15-
getClient: Promise<DbClient>,
16-
options?: { close: (client: DbClient, error?: unknown) => Promise<void> },
17-
): DatabaseTransaction<NodePostgresConnector> => ({
18-
type: NodePostgresConnectorType,
19-
begin: async () => {
20-
const client = await getClient;
21-
await client.query('BEGIN');
22-
},
23-
commit: async () => {
24-
const client = await getClient;
16+
export const nodePostgresTransaction =
17+
<DbClient extends NodePostgresPoolOrClient = NodePostgresPoolOrClient>(
18+
connection: () => Connection<NodePostgresConnector, DbClient>,
19+
) =>
20+
(
21+
getClient: Promise<DbClient>,
22+
options?: { close: (client: DbClient, error?: unknown) => Promise<void> },
23+
): DatabaseTransaction<NodePostgresConnector> => ({
24+
connection: connection(),
25+
type: NodePostgresConnectorType,
26+
begin: async () => {
27+
const client = await getClient;
28+
await client.query('BEGIN');
29+
},
30+
commit: async () => {
31+
const client = await getClient;
2532

26-
await client.query('COMMIT');
33+
await client.query('COMMIT');
2734

28-
if (options?.close) await options?.close(client);
29-
},
30-
rollback: async (error?: unknown) => {
31-
const client = await getClient;
32-
await client.query('ROLLBACK');
35+
if (options?.close) await options?.close(client);
36+
},
37+
rollback: async (error?: unknown) => {
38+
const client = await getClient;
39+
await client.query('ROLLBACK');
3340

34-
if (options?.close) await options?.close(client, error);
35-
},
36-
execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: () => getClient }),
37-
});
41+
if (options?.close) await options?.close(client, error);
42+
},
43+
execute: sqlExecutor(nodePostgresSQLExecutor(), {
44+
connect: () => getClient,
45+
}),
46+
});

src/packages/pongo/src/core/pongoClient.connections.e2e.spec.ts renamed to src/packages/pongo/src/core/pongoClient.connections.int.spec.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { isNodePostgresNativePool } from '@event-driven-io/dumbo';
1+
import { dumbo, isNodePostgresNativePool } from '@event-driven-io/dumbo';
22
import {
33
PostgreSqlContainer,
44
type StartedPostgreSqlContainer,
@@ -81,5 +81,21 @@ void describe('Pongo collection', () => {
8181
await client.end();
8282
}
8383
});
84+
85+
void it('connects using existing connection client', async () => {
86+
const pool = dumbo({ connectionString });
87+
88+
try {
89+
await pool.withTransaction(async ({ connection }) => {
90+
const pongo = pongoClient(connectionString, { connection });
91+
92+
const users = pongo.db().collection<User>('connections');
93+
await users.insertOne({ name: randomUUID() });
94+
await users.insertOne({ name: randomUUID() });
95+
});
96+
} finally {
97+
await pool.close();
98+
}
99+
});
84100
});
85101
});

src/packages/pongo/src/core/pongoClient.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
import { NodePostgresConnectorType } from '@event-driven-io/dumbo';
1+
import {
2+
NodePostgresConnectorType,
3+
type NodePostgresConnection,
4+
} from '@event-driven-io/dumbo';
25
import pg from 'pg';
36
import type { PostgresDbClientOptions } from '../postgres';
47
import { getPongoDb, type AllowedDbClientOptions } from './pongoDb';
@@ -29,6 +32,10 @@ export type NotPooledPongoOptions =
2932
| {
3033
client: pg.Client;
3134
pooled: false;
35+
}
36+
| {
37+
connection: NodePostgresConnection;
38+
pooled?: false;
3239
};
3340

3441
export type PongoClientOptions =

0 commit comments

Comments
 (0)