Skip to content

Commit 9245c37

Browse files
committed
Added span in partialresultStream
1 parent 430a33c commit 9245c37

File tree

9 files changed

+53
-37
lines changed

9 files changed

+53
-37
lines changed

observability-test/database.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ describe('Database', () => {
682682
'Expected that secondRetrySpan is the child to parentSpan'
683683
);
684684

685-
const expectedEventNames = ['No session available'];
685+
const expectedEventNames = ['No session available', 'Using Session'];
686686
assert.deepStrictEqual(
687687
actualEventNames,
688688
expectedEventNames,
@@ -1558,7 +1558,7 @@ describe('Database', () => {
15581558
);
15591559

15601560
// We don't expect events.
1561-
const expectedEventNames = [];
1561+
const expectedEventNames = ['Using Session'];
15621562
assert.deepStrictEqual(
15631563
actualEventNames,
15641564
expectedEventNames,

observability-test/helper.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export const cacheSessionEvents = [
3636
'Acquiring session',
3737
'Cache hit: has usable session',
3838
'Acquired session',
39+
'Using Session',
3940
];
4041

4142
/**
@@ -82,14 +83,25 @@ export async function verifySpansAndEvents(
8283
actualEventNames.push(event.name);
8384
});
8485
});
86+
87+
assert.strictEqual(
88+
actualSpanNames.length,
89+
expectedSpans.length,
90+
`Span count mismatch: Expected ${expectedSpans.length} spans, but received ${actualSpanNames.length} spans`
91+
);
8592
assert.deepStrictEqual(
8693
actualSpanNames,
8794
expectedSpans,
8895
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpans}`
8996
);
97+
assert.strictEqual(
98+
actualEventNames.length,
99+
expectedEvents.length,
100+
`Event count mismatch: Expected ${expectedEvents.length} events, but received ${actualEventNames.length} events`
101+
);
90102
assert.deepStrictEqual(
91-
actualEventNames,
92-
expectedEvents,
103+
actualEventNames.sort(),
104+
expectedEvents.sort(),
93105
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEvents}`
94106
);
95107
}

observability-test/spanner.ts

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ describe('EndToEnd', async () => {
225225
transaction!.commit();
226226

227227
const expectedSpanNames = ['CloudSpanner.Database.getTransaction'];
228-
const expectedEventNames = [...cacheSessionEvents, 'Using Session'];
228+
const expectedEventNames = [...cacheSessionEvents];
229229
await verifySpansAndEvents(
230230
traceExporter,
231231
expectedSpanNames,
@@ -245,11 +245,7 @@ describe('EndToEnd', async () => {
245245
'CloudSpanner.Snapshot.runStream',
246246
'CloudSpanner.Database.runStream',
247247
];
248-
const expectedEventNames = [
249-
'Starting stream',
250-
...cacheSessionEvents,
251-
'Using Session',
252-
];
248+
const expectedEventNames = ['Starting stream', ...cacheSessionEvents];
253249
await verifySpansAndEvents(
254250
traceExporter,
255251
expectedSpanNames,
@@ -267,11 +263,7 @@ describe('EndToEnd', async () => {
267263
'CloudSpanner.Database.runStream',
268264
'CloudSpanner.Database.run',
269265
];
270-
const expectedEventNames = [
271-
'Starting stream',
272-
...cacheSessionEvents,
273-
'Using Session',
274-
];
266+
const expectedEventNames = ['Starting stream', ...cacheSessionEvents];
275267
await verifySpansAndEvents(
276268
traceExporter,
277269
expectedSpanNames,
@@ -319,7 +311,6 @@ describe('EndToEnd', async () => {
319311
const expectedEventNames = [
320312
'Starting stream',
321313
...cacheSessionEvents,
322-
'Using Session',
323314
'Transaction Creation Done',
324315
];
325316
await verifySpansAndEvents(
@@ -350,22 +341,21 @@ describe('EndToEnd', async () => {
350341
'CloudSpanner.Database.batchCreateSessions',
351342
'CloudSpanner.SessionPool.createSessions',
352343
'CloudSpanner.Snapshot.runStream',
344+
'CloudSpanner.Snapshot.begin',
353345
'CloudSpanner.Snapshot.runStream',
354346
'CloudSpanner.Transaction.commit',
355-
'CloudSpanner.Snapshot.begin',
356347
'CloudSpanner.Database.runTransaction',
357348
];
358349
const expectedEventNames = [
359-
...waitingSessionsEvents,
360-
'Retrying Transaction',
350+
...batchCreateSessionsEvents,
361351
'Starting stream',
362-
'exception',
363-
'Stream broken. Not safe to retry',
364352
'Begin Transaction',
365353
'Transaction Creation Done',
366354
'Starting stream',
367355
'Starting Commit',
368356
'Commit Done',
357+
...waitingSessionsEvents,
358+
'Retrying transaction',
369359
];
370360
await verifySpansAndEvents(
371361
traceExporter,
@@ -381,7 +371,7 @@ describe('EndToEnd', async () => {
381371
});
382372
});
383373

384-
it.skip('runTransactionAsync with abort', async () => {
374+
it('runTransactionAsync with abort', async () => {
385375
let attempts = 0;
386376
const database = newTestDatabase();
387377
await database.runTransactionAsync((transaction): Promise<number> => {
@@ -406,11 +396,8 @@ describe('EndToEnd', async () => {
406396
'CloudSpanner.Database.runTransactionAsync',
407397
];
408398
const expectedEventNames = [
409-
'Requesting 25 sessions',
410-
'Creating 25 sessions',
411-
'Requested for 25 sessions returned 25',
399+
...batchCreateSessionsEvents,
412400
'Starting stream',
413-
'exception',
414401
'Stream broken. Not safe to retry',
415402
'Begin Transaction',
416403
'Transaction Creation Done',
@@ -441,7 +428,6 @@ describe('EndToEnd', async () => {
441428
'Starting Commit',
442429
'Commit Done',
443430
...cacheSessionEvents,
444-
'Using Session',
445431
];
446432
await verifySpansAndEvents(
447433
traceExporter,
@@ -639,7 +625,6 @@ describe('ObservabilityOptions injection and propagation', async () => {
639625

640626
const expectedEventNames = [
641627
...cacheSessionEvents,
642-
'Using Session',
643628
'Starting stream',
644629
'Transaction Creation Done',
645630
];
@@ -747,7 +732,6 @@ describe('ObservabilityOptions injection and propagation', async () => {
747732

748733
const expectedEventNames = [
749734
...cacheSessionEvents,
750-
'Using Session',
751735
'Starting stream',
752736
];
753737
assert.deepStrictEqual(

src/database.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2110,14 +2110,14 @@ class Database extends common.GrpcServiceObject {
21102110
span.end();
21112111
this.getSnapshot(options, callback!);
21122112
} else {
2113-
span.addEvent('Using Session', {'session.id': session?.id});
21142113
this.pool_.release(session!);
21152114
span.end();
21162115
callback!(err);
21172116
}
21182117
return;
21192118
}
21202119

2120+
span.addEvent('Using Session', {'session.id': session?.id});
21212121
this._releaseOnEnd(session!, snapshot, span);
21222122
span.end();
21232123
callback!(err, snapshot);
@@ -3244,6 +3244,7 @@ class Database extends common.GrpcServiceObject {
32443244
return;
32453245
}
32463246

3247+
span.addEvent('Using Session', {'session.id': session?.id});
32473248
transaction!._observabilityOptions = this._observabilityOptions;
32483249
if (options.optimisticLock) {
32493250
transaction!.useOptimisticLock();

src/instrument.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,18 @@ export function setSpanError(span: Span, err: Error | String): boolean {
200200
return true;
201201
}
202202

203+
/**
204+
* Sets the span status with err and end, if non-null onto the span with
205+
* status.code=ERROR and the message of err.toString()
206+
*
207+
* @returns {boolean} to signify if the status was set.
208+
*/
209+
export function setSpanErrorAndEnd(span: Span, err: Error | String): boolean {
210+
const status = setSpanError(span, err);
211+
span.end();
212+
return status;
213+
}
214+
203215
/**
204216
* Sets err, if non-null onto the span with
205217
* status.code=ERROR and the message of err.toString()

src/partial-result-stream.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {Readable, Transform} from 'stream';
2424
import * as streamEvents from 'stream-events';
2525
import {grpc, CallOptions} from 'google-gax';
2626
import {DeadlineError, isRetryableInternalError} from './transaction-runner';
27-
import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument';
27+
import {Span} from './instrument';
2828
import {codec, JSONOptions, Json, Field, Value} from './codec';
2929
import {google} from '../protos/protos';
3030
import * as stream from 'stream';
@@ -97,6 +97,7 @@ export interface RowOptions {
9797
*/
9898
columnsMetadata?: object;
9999
gaxOptions?: CallOptions;
100+
span?: Span;
100101
}
101102

102103
/**
@@ -183,7 +184,7 @@ interface ResultEvents {
183184
export class PartialResultStream extends Transform implements ResultEvents {
184185
private _destroyed: boolean;
185186
private _fields!: google.spanner.v1.StructType.Field[];
186-
private _options: RowOptions;
187+
_options: RowOptions;
187188
private _pendingValue?: p.IValue;
188189
private _pendingValueForResume?: p.IValue;
189190
private _values: p.IValue[];
@@ -494,7 +495,6 @@ export function partialResultStream(
494495
let lastRequestStream: Readable;
495496
const startTime = Date.now();
496497
const timeout = options?.gaxOptions?.timeout ?? Infinity;
497-
const span = getActiveOrNoopSpan();
498498

499499
// mergeStream allows multiple streams to be connected into one. This is good;
500500
// if we need to retry a request and pipe more data to the user's stream.
@@ -569,7 +569,6 @@ export function partialResultStream(
569569
// checkpoint stream has queued. After that, we will destroy the
570570
// user's stream with the same error.
571571
setImmediate(() => batchAndSplitOnTokenStream.destroy(err));
572-
// setSpanErrorAndException(span, err as Error);
573572
return;
574573
}
575574

src/transaction-runner.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import {Session} from './session';
2323
import {Transaction} from './transaction';
2424
import {NormalCallback} from './common';
2525
import {isSessionNotFoundError} from './session-pool';
26-
import {getActiveOrNoopSpan} from './instrument';
26+
import {getActiveOrNoopSpan, setSpanErrorAndEnd} from './instrument';
2727
import {Database} from './database';
2828
import {google} from '../protos/protos';
2929
import IRequestOptions = google.spanner.v1.IRequestOptions;
@@ -314,9 +314,12 @@ export class TransactionRunner extends Runner<void> {
314314
transaction.requestStream = (config: object) => {
315315
const proxyStream = through.obj();
316316
const stream = requestStream(config);
317+
const resultStream = transaction.resultStream;
317318

318319
stream
319320
.on('error', (err: grpc.ServiceError) => {
321+
resultStream?._options.span &&
322+
setSpanErrorAndEnd(resultStream?._options.span, err);
320323
if (!this.shouldRetry(err)) {
321324
proxyStream.destroy(err);
322325
return;

src/transaction.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ export class Snapshot extends EventEmitter {
287287
readTimestampProto?: spannerClient.protobuf.ITimestamp;
288288
request: (config: {}, callback: Function) => void;
289289
requestStream: (config: {}) => Readable;
290+
resultStream?: PartialResultStream;
290291
session: Session;
291292
queryOptions?: IQueryOptions;
292293
resourceHeader_: {[k: string]: string};
@@ -751,6 +752,7 @@ export class Snapshot extends EventEmitter {
751752
maxResumeRetries,
752753
columnsMetadata,
753754
gaxOptions,
755+
span,
754756
}
755757
)
756758
?.on('response', response => {
@@ -789,6 +791,7 @@ export class Snapshot extends EventEmitter {
789791
});
790792
}
791793

794+
this.resultStream = resultStream;
792795
return resultStream;
793796
});
794797
}
@@ -1332,6 +1335,7 @@ export class Snapshot extends EventEmitter {
13321335
maxResumeRetries,
13331336
columnsMetadata,
13341337
gaxOptions,
1338+
span,
13351339
}
13361340
)
13371341
.on('response', response => {
@@ -1371,6 +1375,7 @@ export class Snapshot extends EventEmitter {
13711375
});
13721376
}
13731377

1378+
this.resultStream = resultStream;
13741379
return resultStream;
13751380
});
13761381
}

test/transaction.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ describe('Transaction', () => {
410410
assert.strictEqual(reqOpts.jsonOptions, undefined);
411411
assert.strictEqual(reqOpts.maxResumeRetries, undefined);
412412

413-
const options = PARTIAL_RESULT_STREAM.lastCall.args[1];
413+
const {span, ...options} = PARTIAL_RESULT_STREAM.lastCall.args[1];
414414

415415
assert.deepStrictEqual(options, fakeOptions);
416416
});
@@ -791,7 +791,7 @@ describe('Transaction', () => {
791791
assert.strictEqual(reqOpts.jsonOptions, undefined);
792792
assert.strictEqual(reqOpts.maxResumeRetries, undefined);
793793

794-
const options = PARTIAL_RESULT_STREAM.lastCall.args[1];
794+
const {span, ...options} = PARTIAL_RESULT_STREAM.lastCall.args[1];
795795

796796
assert.deepStrictEqual(options, expectedOptions);
797797
});

0 commit comments

Comments
 (0)