Skip to content

Commit d95cab5

Browse files
authored
feat(spanner): add support for change streams transaction exclusion option (#2049)
Add support for excluding transactions from being recorded in the change streams by passing a new boolean option ExcludeTxnFromChangeStreams in the various write APIs: `runTransaction` `getTransaction` `runPartitionedUpdate` `_mutate` Note: Samples will be added later in separate prs.
1 parent 628f4b0 commit d95cab5

File tree

6 files changed

+343
-13
lines changed

6 files changed

+343
-13
lines changed

src/database.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ export interface SetIamPolicyRequest {
142142
updateMask?: FieldMask | null;
143143
}
144144

145+
export interface RunPartitionedUpdateOptions extends ExecuteSqlRequest {
146+
excludeTxnFromChangeStreams?: boolean;
147+
}
148+
145149
export type UpdateSchemaCallback = ResourceCallback<
146150
GaxOperation,
147151
databaseAdmin.longrunning.IOperation
@@ -2092,6 +2096,9 @@ class Database extends common.GrpcServiceObject {
20922096
if (options.optimisticLock) {
20932097
transaction!.useOptimisticLock();
20942098
}
2099+
if (options.excludeTxnFromChangeStreams) {
2100+
transaction!.excludeTxnFromChangeStreams();
2101+
}
20952102
if (!err) {
20962103
this._releaseOnEnd(session!, transaction!);
20972104
}
@@ -2711,13 +2718,15 @@ class Database extends common.GrpcServiceObject {
27112718
* @param {RunUpdateCallback} [callback] Callback function.
27122719
* @returns {Promise<RunUpdateResponse>}
27132720
*/
2714-
runPartitionedUpdate(query: string | ExecuteSqlRequest): Promise<[number]>;
27152721
runPartitionedUpdate(
2716-
query: string | ExecuteSqlRequest,
2722+
query: string | RunPartitionedUpdateOptions
2723+
): Promise<[number]>;
2724+
runPartitionedUpdate(
2725+
query: string | RunPartitionedUpdateOptions,
27172726
callback?: RunUpdateCallback
27182727
): void;
27192728
runPartitionedUpdate(
2720-
query: string | ExecuteSqlRequest,
2729+
query: string | RunPartitionedUpdateOptions,
27212730
callback?: RunUpdateCallback
27222731
): void | Promise<[number]> {
27232732
this.pool_.getSession((err, session) => {
@@ -2732,11 +2741,14 @@ class Database extends common.GrpcServiceObject {
27322741

27332742
_runPartitionedUpdate(
27342743
session: Session,
2735-
query: string | ExecuteSqlRequest,
2744+
query: string | RunPartitionedUpdateOptions,
27362745
callback?: RunUpdateCallback
27372746
): void | Promise<number> {
27382747
const transaction = session.partitionedDml();
27392748

2749+
if (typeof query !== 'string' && query.excludeTxnFromChangeStreams) {
2750+
transaction.excludeTxnFromChangeStreams();
2751+
}
27402752
transaction.begin(err => {
27412753
if (err) {
27422754
this.pool_.release(session!);
@@ -3059,6 +3071,9 @@ class Database extends common.GrpcServiceObject {
30593071
if (options.optimisticLock) {
30603072
transaction!.useOptimisticLock();
30613073
}
3074+
if (options.excludeTxnFromChangeStreams) {
3075+
transaction!.excludeTxnFromChangeStreams();
3076+
}
30623077

30633078
const release = this.pool_.release.bind(this.pool_, session!);
30643079
const runner = new TransactionRunner(
@@ -3173,6 +3188,9 @@ class Database extends common.GrpcServiceObject {
31733188
if (options.optimisticLock) {
31743189
transaction.useOptimisticLock();
31753190
}
3191+
if (options.excludeTxnFromChangeStreams) {
3192+
transaction.excludeTxnFromChangeStreams();
3193+
}
31763194
const runner = new AsyncTransactionRunner<T>(
31773195
session,
31783196
transaction,

src/table.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export type DropTableCallback = UpdateSchemaCallback;
4646

4747
interface MutateRowsOptions extends CommitOptions {
4848
requestOptions?: Omit<IRequestOptions, 'requestTag'>;
49+
excludeTxnFromChangeStreams?: boolean;
4950
}
5051

5152
export type DeleteRowsCallback = CommitCallback;
@@ -1073,15 +1074,27 @@ class Table {
10731074
): void {
10741075
const requestOptions =
10751076
'requestOptions' in options ? options.requestOptions : {};
1076-
this.database.runTransaction({requestOptions}, (err, transaction) => {
1077-
if (err) {
1078-
callback(err);
1079-
return;
1080-
}
10811077

1082-
transaction![method](this.name, rows as Key[]);
1083-
transaction!.commit(options, callback);
1084-
});
1078+
const excludeTxnFromChangeStreams =
1079+
'excludeTxnFromChangeStreams' in options
1080+
? options.excludeTxnFromChangeStreams
1081+
: false;
1082+
1083+
this.database.runTransaction(
1084+
{
1085+
requestOptions: requestOptions,
1086+
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
1087+
},
1088+
(err, transaction) => {
1089+
if (err) {
1090+
callback(err);
1091+
return;
1092+
}
1093+
1094+
transaction![method](this.name, rows as Key[]);
1095+
transaction!.commit(options, callback);
1096+
}
1097+
);
10851098
}
10861099
}
10871100

src/transaction-runner.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export interface RunTransactionOptions {
4545
timeout?: number;
4646
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
4747
optimisticLock?: boolean;
48+
excludeTxnFromChangeStreams?: boolean;
4849
}
4950

5051
/**
@@ -204,6 +205,9 @@ export abstract class Runner<T> {
204205
if (this.options.optimisticLock) {
205206
transaction.useOptimisticLock();
206207
}
208+
if (this.options.excludeTxnFromChangeStreams) {
209+
transaction.excludeTxnFromChangeStreams();
210+
}
207211
if (this.attempts > 0) {
208212
await transaction.begin();
209213
}

src/transaction.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2470,6 +2470,18 @@ export class Transaction extends Dml {
24702470
useOptimisticLock(): void {
24712471
this._options.readWrite!.readLockMode = ReadLockMode.OPTIMISTIC;
24722472
}
2473+
2474+
/**
2475+
* Use option excludeTxnFromChangeStreams to exclude read/write transactions
2476+
* from being tracked in change streams.
2477+
*
2478+
* Enabling this options to true will effectively disable change stream tracking
2479+
* for a specified transaction, allowing read/write transaction to operate without being
2480+
* included in change streams.
2481+
*/
2482+
excludeTxnFromChangeStreams(): void {
2483+
this._options.excludeTxnFromChangeStreams = true;
2484+
}
24732485
}
24742486

24752487
/*! Developer Documentation
@@ -2503,6 +2515,17 @@ export class PartitionedDml extends Dml {
25032515
super(session);
25042516
this._options = {partitionedDml: options};
25052517
}
2518+
/**
2519+
* Use option excludeTxnFromChangeStreams to exclude partitionedDml
2520+
* queries from being tracked in change streams.
2521+
*
2522+
* Enabling this options to true will effectively disable change stream tracking
2523+
* for a specified partitionedDml query, allowing write queries to operate
2524+
* without being included in change streams.
2525+
*/
2526+
excludeTxnFromChangeStreams(): void {
2527+
this._options.excludeTxnFromChangeStreams = true;
2528+
}
25062529

25072530
/**
25082531
* Execute a DML statement and get the affected row count. Unlike

test/database.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2544,7 +2544,8 @@ describe('Database', () => {
25442544

25452545
const [query] = runUpdateStub.lastCall.args;
25462546

2547-
assert.strictEqual(query, QUERY);
2547+
assert.strictEqual(query.sql, QUERY.sql);
2548+
assert.deepStrictEqual(query.params, QUERY.params);
25482549
assert.ok(fakeCallback.calledOnce);
25492550
});
25502551

@@ -2581,6 +2582,24 @@ describe('Database', () => {
25812582
assert.ok(fakeCallback.calledOnce);
25822583
});
25832584

2585+
it('should accept excludeTxnFromChangeStreams', () => {
2586+
const fakeCallback = sandbox.spy();
2587+
2588+
database.runPartitionedUpdate(
2589+
{
2590+
excludeTxnFromChangeStream: true,
2591+
},
2592+
fakeCallback
2593+
);
2594+
2595+
const [query] = runUpdateStub.lastCall.args;
2596+
2597+
assert.deepStrictEqual(query, {
2598+
excludeTxnFromChangeStream: true,
2599+
});
2600+
assert.ok(fakeCallback.calledOnce);
2601+
});
2602+
25842603
it('should ignore directedReadOptions set for client', () => {
25852604
const fakeCallback = sandbox.spy();
25862605

0 commit comments

Comments
 (0)