Skip to content

Commit

Permalink
Addressing a few issues regarding the OP reentry scenarios (#16176)
Browse files Browse the repository at this point in the history
## Description

Related to ADO:4537.

[Grouped
batching](https://github.com/microsoft/FluidFramework/tree/main/packages/runtime/container-runtime/src/opLifecycle#grouped-batching)
is problematic in the context of op reentry, as all ops within the same
batch have the same sequence number. A reentrant op (created during the
execution of a 'changed' event callback of a DDS) will have a different
baseline than the rest of the batch and with the erasure of sequence
numbers, it is impossible for the DDSes to properly apply the op. This
PR addresses this issue by detecting batches with reentrant ops and
rebasing them. Rebasing a batch means resubmitting it to the runtime to
have all its ops in agreement re reference sequence numbers and sequence
numbers. The issue is showcased in the `Eventual consistency with op
reentry` end to end test, which is currently disabled until the DDS
supports rebasing, which will happen very soon. The test proves that
enabling group batching risks creating inconsistent data models in the
op reentry scenario.

In a nutshell, this change will force the offending client to reorder
its batch before it is sent, ensuring that all clients have the same
view of the data model.

### Changes included in this PR:

- allow for rebasing ops, opt-in (disabled by default) via runtime
options with a feature gate override for shutting it down.
- increase the captured stack length whenever we detect op reentry. This
will be useful to identify problematic customer scenarios to leave our
options open to possibly disable op reentry altogether in the future.
- flushing is blocked inside DDS event handlers. This is to simplify the
possible states in which the runtime can end up and also ensure that a
client cannot spam the backend with ops in case of unbounded recursion.
  • Loading branch information
andre4i authored Jun 28, 2023
1 parent 3a45679 commit ecf480c
Show file tree
Hide file tree
Showing 12 changed files with 551 additions and 93 deletions.
1 change: 1 addition & 0 deletions api-report/container-runtime.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ export interface IConnectableRuntime {
export interface IContainerRuntimeOptions {
readonly chunkSizeInBytes?: number;
readonly compressionOptions?: ICompressionRuntimeOptions;
readonly enableBatchRebasing?: boolean;
readonly enableGroupedBatching?: boolean;
readonly enableOpReentryCheck?: boolean;
readonly enableRuntimeIdCompressor?: boolean;
Expand Down
28 changes: 27 additions & 1 deletion packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ import {
OpSplitter,
RemoteMessageProcessor,
OpGroupingManager,
getLongStack,
} from "./opLifecycle";
import { DeltaManagerSummarizerProxy } from "./deltaManagerSummarizerProxy";

Expand Down Expand Up @@ -435,9 +436,24 @@ export interface IContainerRuntimeOptions {
* By default, the feature is disabled. If enabled from options, the `Fluid.ContainerRuntime.DisableGroupedBatching`
* flag can be used to disable it at runtime.
*
* For safety, {@link IContainerRuntimeOptions#enableBatchRebasing} needs to also be enabled to ensure
* consistency across clients.
*
* @experimental Not ready for use.
*/
readonly enableGroupedBatching?: boolean;
/**
* Configures if the runtime should rebase a batch of ops when it detects op reentrancy,
* when an op is created as the result of processing another op. Usually this is the case
* when changes are made to a DDS inside a DDS 'onChanged' event handler. This means that the
* reentrant op will have a different reference sequence number than the rest of the ops in
* the batch, resulting in a different view of the state of the data model. Therefore all ops
* must be resubmitted and rebased to the current reference sequence number to be in agreement
* about the state of the data model.
*
* @experimental Not ready for use.
*/
readonly enableBatchRebasing?: boolean;
}

/**
Expand Down Expand Up @@ -685,6 +701,7 @@ export class ContainerRuntime
chunkSizeInBytes = defaultChunkSizeInBytes,
enableOpReentryCheck = false,
enableGroupedBatching = false,
enableBatchRebasing = false,
} = runtimeOptions;

const registry = new FluidDataStoreRegistry(registryEntries);
Expand Down Expand Up @@ -783,6 +800,7 @@ export class ContainerRuntime
enableRuntimeIdCompressor,
enableOpReentryCheck,
enableGroupedBatching,
enableBatchRebasing,
},
containerScope,
logger,
Expand Down Expand Up @@ -1329,6 +1347,9 @@ export class ContainerRuntime
const disablePartialFlush = this.mc.config.getBoolean(
"Fluid.ContainerRuntime.DisablePartialFlush",
);
const enableBatchRebasing =
runtimeOptions.enableBatchRebasing &&
this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableBatchRebasing") !== true;
this.outbox = new Outbox({
shouldSend: () => this.canSendOps(),
pendingStateManager: this.pendingStateManager,
Expand All @@ -1339,13 +1360,17 @@ export class ContainerRuntime
compressionOptions,
maxBatchSizeInBytes: runtimeOptions.maxBatchSizeInBytes,
disablePartialFlush: disablePartialFlush === true,
enableBatchRebasing,
},
logger: this.mc.logger,
groupingManager: opGroupingManager,
getCurrentSequenceNumbers: () => ({
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
clientSequenceNumber: this._processedClientSequenceNumber,
}),
reSubmit: this.reSubmit.bind(this),
opReentrancy: () => this.ensureNoDataModelChangesCalls > 0,
closeContainer: this.closeFn,
});

this.context.quorum.on("removeMember", (clientId: string) => {
Expand Down Expand Up @@ -1506,6 +1531,7 @@ export class ContainerRuntime
idCompressorEnabled: this.idCompressorEnabled,
summaryStateUpdateMethod: this.summaryStateUpdateMethod,
closeSummarizerDelayOverride,
enableBatchRebasing,
}),
telemetryDocumentId: this.telemetryDocumentId,
groupedBatchingEnabled: this.groupedBatchingEnabled,
Expand Down Expand Up @@ -3194,7 +3220,7 @@ export class ContainerRuntime
this.mc.logger.sendTelemetryEvent(
{ eventName: "OpReentry" },
// We need to capture the call stack in order to inspect the source of this usage pattern
new UsageError(errorMessage),
getLongStack(() => new UsageError(errorMessage)),
);
this.opReentryCallsToReport--;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const opOverhead = 200;
export class BatchManager {
private pendingBatch: BatchMessage[] = [];
private batchContentSize = 0;
private hasReentrantOps = false;

public get length() {
return this.pendingBatch.length;
Expand All @@ -54,9 +55,14 @@ export class BatchManager {

constructor(public readonly options: IBatchManagerOptions) {}

public push(message: BatchMessage, currentClientSequenceNumber?: number): boolean {
public push(
message: BatchMessage,
reentrant: boolean,
currentClientSequenceNumber?: number,
): boolean {
const contentSize = this.batchContentSize + (message.contents?.length ?? 0);
const opCount = this.pendingBatch.length;
this.hasReentrantOps = this.hasReentrantOps || reentrant;

// Attempt to estimate batch size, aka socket message size.
// Each op has pretty large envelope, estimating to be 200 bytes.
Expand Down Expand Up @@ -100,11 +106,13 @@ export class BatchManager {
content: this.pendingBatch,
contentSizeInBytes: this.batchContentSize,
referenceSequenceNumber: this.referenceSequenceNumber,
hasReentrantOps: this.hasReentrantOps,
};

this.pendingBatch = [];
this.batchContentSize = 0;
this.clientSequenceNumber = undefined;
this.hasReentrantOps = false;

return addBatchMetadata(batch);
}
Expand Down
11 changes: 11 additions & 0 deletions packages/runtime/container-runtime/src/opLifecycle/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ export interface IBatch {
* The reference sequence number for the batch
*/
readonly referenceSequenceNumber: number | undefined;
/**
* Wether or not the batch contains at least one op which was produced as the result
* of processing another op. This means that the batch must be rebased before
* submitted, to ensure that all ops have the same reference sequence numbers and a
* consistent view of the data model. This happens when the op is created within a
* 'changed' event handler of a DDS and will have a different reference sequence number
* than the rest of the ops in the batch, meaning that it has a different view of the
* state of the data model, therefore all ops must be resubmitted and rebased to the current
* reference sequence number to be in agreement about the data model state.
*/
readonly hasReentrantOps?: boolean;
}

export interface IBatchCheckpoint {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export {
IChunkedOp,
IMessageProcessingResult,
} from "./definitions";
export { Outbox } from "./outbox";
export { Outbox, getLongStack } from "./outbox";
export { OpCompressor } from "./opCompressor";
export { OpDecompressor } from "./opDecompressor";
export { OpSplitter, splitOp } from "./opSplitter";
Expand Down
123 changes: 107 additions & 16 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import {
MonitoringContext,
} from "@fluidframework/telemetry-utils";
import { assert } from "@fluidframework/common-utils";
import { IContainerContext } from "@fluidframework/container-definitions";
import { IContainerContext, ICriticalContainerError } from "@fluidframework/container-definitions";
import { GenericError, UsageError } from "@fluidframework/container-utils";
import { MessageType } from "@fluidframework/protocol-definitions";
import { ICompressionRuntimeOptions } from "../containerRuntime";
import { PendingStateManager } from "../pendingStateManager";
import { IPendingBatchMessage, PendingStateManager } from "../pendingStateManager";
import {
BatchManager,
BatchSequenceNumbers,
Expand All @@ -31,6 +31,7 @@ export interface IOutboxConfig {
// The maximum size of a batch that we can send over the wire.
readonly maxBatchSizeInBytes: number;
readonly disablePartialFlush: boolean;
readonly enableBatchRebasing: boolean;
}

export interface IOutboxParameters {
Expand All @@ -43,18 +44,40 @@ export interface IOutboxParameters {
readonly logger: ITelemetryLoggerExt;
readonly groupingManager: OpGroupingManager;
readonly getCurrentSequenceNumbers: () => BatchSequenceNumbers;
readonly reSubmit: (message: IPendingBatchMessage) => void;
readonly opReentrancy: () => boolean;
readonly closeContainer: (error?: ICriticalContainerError) => void;
}

function getLongStack(action: () => Error): Error {
// Increase the stack trace limit temporarily, so as to debug better in case it occurs.
/**
* Temporarily increase the stack limit while executing the provided action.
* If a negative value is provided for `length`, no stack frames will be collected.
* If Infinity is provided, all frames will be collected.
*
* ADO:4663 - add this to the common packages.
*
* @param action - action which returns an error
* @param length - number of stack frames to collect, 50 if unspecified.
* @returns the result of the action provided
*/
export function getLongStack<T>(action: () => T, length: number = 50): T {
const errorObj = Error as any;
if (
(
Object.getOwnPropertyDescriptor(errorObj, "stackTraceLimit") ||
Object.getOwnPropertyDescriptor(Object.getPrototypeOf(errorObj), "stackTraceLimit") ||
{}
).writable !== true
) {
return action();
}

const originalStackTraceLimit = errorObj.stackTraceLimit;
try {
const originalStackTraceLimit = (Error as any).stackTraceLimit;
(Error as any).stackTraceLimit = 50;
const result = action();
(Error as any).stackTraceLimit = originalStackTraceLimit;
return result;
} catch (error) {
errorObj.stackTraceLimit = length;
return action();
} finally {
errorObj.stackTraceLimit = originalStackTraceLimit;
}
}

Expand All @@ -63,6 +86,8 @@ export class Outbox {
private readonly attachFlowBatch: BatchManager;
private readonly mainBatch: BatchManager;
private readonly defaultAttachFlowSoftLimitInBytes = 320 * 1024;
private batchRebasesToReport = 5;
private rebasing = false;

/**
* Track the number of ops which were detected to have a mismatched
Expand Down Expand Up @@ -132,7 +157,7 @@ export class Outbox {
}

if (!this.params.config.disablePartialFlush) {
this.flush();
this.flushAll();
}
}

Expand All @@ -142,6 +167,7 @@ export class Outbox {
if (
!this.mainBatch.push(
message,
this.isContextReentrant(),
this.params.getCurrentSequenceNumbers().clientSequenceNumber,
)
) {
Expand All @@ -160,16 +186,18 @@ export class Outbox {
if (
!this.attachFlowBatch.push(
message,
this.isContextReentrant(),
this.params.getCurrentSequenceNumbers().clientSequenceNumber,
)
) {
// BatchManager has two limits - soft limit & hard limit. Soft limit is only engaged
// when queue is not empty.
// Flush queue & retry. Failure on retry would mean - single message is bigger than hard limit
this.flushInternal(this.attachFlowBatch.popBatch());
this.flushInternal(this.attachFlowBatch);
if (
!this.attachFlowBatch.push(
message,
this.isContextReentrant(),
this.params.getCurrentSequenceNumbers().clientSequenceNumber,
)
) {
Expand All @@ -191,22 +219,85 @@ export class Outbox {
this.attachFlowBatch.contentSizeInBytes >=
this.params.config.compressionOptions.minimumBatchSizeInBytes
) {
this.flushInternal(this.attachFlowBatch.popBatch());
this.flushInternal(this.attachFlowBatch);
}
}

public flush() {
this.flushInternal(this.attachFlowBatch.popBatch());
this.flushInternal(this.mainBatch.popBatch());
if (this.isContextReentrant()) {
const error = new UsageError("Flushing is not supported inside DDS event handlers");
this.params.closeContainer(error);
throw error;
}

this.flushAll();
}

private flushAll() {
this.flushInternal(this.attachFlowBatch);
this.flushInternal(this.mainBatch);
}

private flushInternal(rawBatch: IBatch) {
private flushInternal(batchManager: BatchManager) {
if (batchManager.empty) {
return;
}

const rawBatch = batchManager.popBatch();
if (rawBatch.hasReentrantOps === true && this.params.config.enableBatchRebasing) {
assert(!this.rebasing, "A rebased batch should never have reentrant ops");
// If a batch contains reentrant ops (ops created as a result from processing another op)
// it needs to be rebased so that we can ensure consistent reference sequence numbers
// and eventual consistency at the DDS level.
this.rebase(rawBatch, batchManager);
return;
}

const processedBatch = this.compressBatch(rawBatch);
this.sendBatch(processedBatch);

this.persistBatch(rawBatch.content);
}

/**
* Rebases a batch. All the ops in the batch are resubmitted to the runtime and
* they will end up back in the same batch manager they were flushed from and subsequently flushed.
*
* @param rawBatch - the batch to be rebased
*/
private rebase(rawBatch: IBatch, batchManager: BatchManager) {
assert(!this.rebasing, "Reentrancy");

this.rebasing = true;
for (const message of rawBatch.content) {
this.params.reSubmit({
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
content: message.contents!,
localOpMetadata: message.localOpMetadata,
opMetadata: message.metadata,
});
}

if (this.batchRebasesToReport > 0) {
this.mc.logger.sendTelemetryEvent(
{
eventName: "BatchRebase",
length: rawBatch.content.length,
referenceSequenceNumber: rawBatch.referenceSequenceNumber,
},
new UsageError("BatchRebase"),
);
this.batchRebasesToReport--;
}

this.flushInternal(batchManager);
this.rebasing = false;
}

private isContextReentrant(): boolean {
return this.params.opReentrancy() && !this.rebasing;
}

private compressBatch(batch: IBatch): IBatch {
if (
batch.content.length === 0 ||
Expand Down
Loading

0 comments on commit ecf480c

Please sign in to comment.