Skip to content

Commit

Permalink
Merge pull request #2884 from murgatroid99/grpc-js_call_committed_con…
Browse files Browse the repository at this point in the history
…sistency

grpc-js: Always use RetryingCall, always call onCommitted
  • Loading branch information
murgatroid99 authored Jan 15, 2025
2 parents 5a942ed + a5b6178 commit 908c22a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 35 deletions.
27 changes: 0 additions & 27 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,33 +705,6 @@ export class InternalChannel {
);
}

createInnerCall(
callConfig: CallConfig,
method: string,
host: string,
credentials: CallCredentials,
deadline: Deadline
): LoadBalancingCall | RetryingCall {
// Create a RetryingCall if retries are enabled
if (this.options['grpc.enable_retries'] === 0) {
return this.createLoadBalancingCall(
callConfig,
method,
host,
credentials,
deadline
);
} else {
return this.createRetryingCall(
callConfig,
method,
host,
credentials,
deadline
);
}
}

createResolvingCall(
method: string,
deadline: Deadline,
Expand Down
1 change: 0 additions & 1 deletion packages/grpc-js/src/load-balancing-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ export class LoadBalancingCall implements Call, DeadlineInfoProvider {
);
return;
}
this.callConfig.onCommitted?.();
pickResult.onCallStarted?.();
this.onCallEnded = pickResult.onCallEnded;
this.trace(
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js/src/resolving-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ export class ResolvingCall implements Call {
this.filterStack = this.filterStackFactory.createFilter();
this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(
filteredMetadata => {
this.child = this.channel.createInnerCall(
this.child = this.channel.createRetryingCall(
config,
this.method,
this.host,
Expand Down
27 changes: 21 additions & 6 deletions packages/grpc-js/src/retrying-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ interface UnderlyingCall {
* transparent retry attempts may still be sent
* COMMITTED: One attempt is committed, and no new attempts will be
* sent
* NO_RETRY: Retries are disabled. Exists to track the transition to COMMITTED
*/
type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED' | 'NO_RETRY';

/**
* The different types of objects that can be stored in the write buffer, with
Expand Down Expand Up @@ -229,6 +230,9 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
} else if (callConfig.methodConfig.hedgingPolicy) {
this.state = 'HEDGING';
this.maxAttempts = Math.min(callConfig.methodConfig.hedgingPolicy.maxAttempts, maxAttemptsLimit);
} else if (channel.getOptions()['grpc.enable_retries'] === 0) {
this.state = 'NO_RETRY';
this.maxAttempts = 1;
} else {
this.state = 'TRANSPARENT_ONLY';
this.maxAttempts = 1;
Expand Down Expand Up @@ -318,8 +322,15 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
if (this.state !== 'COMMITTED') {
return;
}
const earliestNeededMessageIndex =
this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
let earliestNeededMessageIndex: number;
if (this.underlyingCalls[this.committedCallIndex!].state === 'COMPLETED') {
/* If the committed call is completed, clear all messages, even if some
* have not been sent. */
earliestNeededMessageIndex = this.getNextBufferIndex();
} else {
earliestNeededMessageIndex =
this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
}
for (
let messageIndex = this.writeBufferOffset;
messageIndex < earliestNeededMessageIndex;
Expand All @@ -343,16 +354,14 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
if (this.state === 'COMMITTED') {
return;
}
if (this.underlyingCalls[index].state === 'COMPLETED') {
return;
}
this.trace(
'Committing call [' +
this.underlyingCalls[index].call.getCallNumber() +
'] at index ' +
index
);
this.state = 'COMMITTED';
this.callConfig.onCommitted?.();
this.committedCallIndex = index;
for (let i = 0; i < this.underlyingCalls.length; i++) {
if (i === index) {
Expand Down Expand Up @@ -471,6 +480,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
) {
switch (this.state) {
case 'COMMITTED':
case 'NO_RETRY':
case 'TRANSPARENT_ONLY':
this.commitCall(callIndex);
this.reportStatus(status);
Expand Down Expand Up @@ -566,6 +576,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
this.reportStatus(status);
return;
}
if (this.state === 'NO_RETRY') {
this.commitCall(callIndex);
this.reportStatus(status);
return;
}
if (this.state === 'COMMITTED') {
this.reportStatus(status);
return;
Expand Down

0 comments on commit 908c22a

Please sign in to comment.