diff --git a/api-report/container-runtime.api.md b/api-report/container-runtime.api.md index 604059bbb535..90419e12ddfc 100644 --- a/api-report/container-runtime.api.md +++ b/api-report/container-runtime.api.md @@ -353,6 +353,7 @@ export interface IConnectableRuntime { export interface IContainerRuntimeOptions { readonly chunkSizeInBytes?: number; readonly compressionOptions?: ICompressionRuntimeOptions; + readonly enableBatchRebasing?: boolean; readonly enableGroupedBatching?: boolean; readonly enableOpReentryCheck?: boolean; readonly enableRuntimeIdCompressor?: boolean; diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index bbde86904688..acdaccc8bc37 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -179,6 +179,7 @@ import { OpSplitter, RemoteMessageProcessor, OpGroupingManager, + getLongStack, } from "./opLifecycle"; import { DeltaManagerSummarizerProxy } from "./deltaManagerSummarizerProxy"; @@ -435,9 +436,24 @@ export interface IContainerRuntimeOptions { * By default, the feature is disabled. If enabled from options, the `Fluid.ContainerRuntime.DisableGroupedBatching` * flag can be used to disable it at runtime. * + * For safety, {@link IContainerRuntimeOptions#enableBatchRebasing} needs to also be enabled to ensure + * consistency across clients. + * * @experimental Not ready for use. */ readonly enableGroupedBatching?: boolean; + /** + * Configures if the runtime should rebase a batch of ops when it detects op reentrancy, + * when an op is created as the result of processing another op. Usually this is the case + * when changes are made to a DDS inside a DDS 'onChanged' event handler. This means that the + * reentrant op will have a different reference sequence number than the rest of the ops in + * the batch, resulting in a different view of the state of the data model. Therefore all ops + * must be resubmitted and rebased to the current reference sequence number to be in agreement + * about the state of the data model. + * + * @experimental Not ready for use. + */ + readonly enableBatchRebasing?: boolean; } /** @@ -685,6 +701,7 @@ export class ContainerRuntime chunkSizeInBytes = defaultChunkSizeInBytes, enableOpReentryCheck = false, enableGroupedBatching = false, + enableBatchRebasing = false, } = runtimeOptions; const registry = new FluidDataStoreRegistry(registryEntries); @@ -783,6 +800,7 @@ export class ContainerRuntime enableRuntimeIdCompressor, enableOpReentryCheck, enableGroupedBatching, + enableBatchRebasing, }, containerScope, logger, @@ -1329,6 +1347,9 @@ export class ContainerRuntime const disablePartialFlush = this.mc.config.getBoolean( "Fluid.ContainerRuntime.DisablePartialFlush", ); + const enableBatchRebasing = + runtimeOptions.enableBatchRebasing && + this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableBatchRebasing") !== true; this.outbox = new Outbox({ shouldSend: () => this.canSendOps(), pendingStateManager: this.pendingStateManager, @@ -1339,6 +1360,7 @@ export class ContainerRuntime compressionOptions, maxBatchSizeInBytes: runtimeOptions.maxBatchSizeInBytes, disablePartialFlush: disablePartialFlush === true, + enableBatchRebasing, }, logger: this.mc.logger, groupingManager: opGroupingManager, @@ -1346,6 +1368,9 @@ export class ContainerRuntime referenceSequenceNumber: this.deltaManager.lastSequenceNumber, clientSequenceNumber: this._processedClientSequenceNumber, }), + reSubmit: this.reSubmit.bind(this), + opReentrancy: () => this.ensureNoDataModelChangesCalls > 0, + closeContainer: this.closeFn, }); this.context.quorum.on("removeMember", (clientId: string) => { @@ -1506,6 +1531,7 @@ export class ContainerRuntime idCompressorEnabled: this.idCompressorEnabled, summaryStateUpdateMethod: this.summaryStateUpdateMethod, closeSummarizerDelayOverride, + enableBatchRebasing, }), telemetryDocumentId: this.telemetryDocumentId, groupedBatchingEnabled: this.groupedBatchingEnabled, @@ -3194,7 +3220,7 @@ export class ContainerRuntime this.mc.logger.sendTelemetryEvent( { eventName: "OpReentry" }, // We need to capture the call stack in order to inspect the source of this usage pattern - new UsageError(errorMessage), + getLongStack(() => new UsageError(errorMessage)), ); this.opReentryCallsToReport--; } diff --git a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts index 1c8c1376d0c0..3fbc17e18e15 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts @@ -29,6 +29,7 @@ const opOverhead = 200; export class BatchManager { private pendingBatch: BatchMessage[] = []; private batchContentSize = 0; + private hasReentrantOps = false; public get length() { return this.pendingBatch.length; @@ -54,9 +55,14 @@ export class BatchManager { constructor(public readonly options: IBatchManagerOptions) {} - public push(message: BatchMessage, currentClientSequenceNumber?: number): boolean { + public push( + message: BatchMessage, + reentrant: boolean, + currentClientSequenceNumber?: number, + ): boolean { const contentSize = this.batchContentSize + (message.contents?.length ?? 0); const opCount = this.pendingBatch.length; + this.hasReentrantOps = this.hasReentrantOps || reentrant; // Attempt to estimate batch size, aka socket message size. // Each op has pretty large envelope, estimating to be 200 bytes. @@ -100,11 +106,13 @@ export class BatchManager { content: this.pendingBatch, contentSizeInBytes: this.batchContentSize, referenceSequenceNumber: this.referenceSequenceNumber, + hasReentrantOps: this.hasReentrantOps, }; this.pendingBatch = []; this.batchContentSize = 0; this.clientSequenceNumber = undefined; + this.hasReentrantOps = false; return addBatchMetadata(batch); } diff --git a/packages/runtime/container-runtime/src/opLifecycle/definitions.ts b/packages/runtime/container-runtime/src/opLifecycle/definitions.ts index 9a94e0f3060c..9895829d9e62 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/definitions.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/definitions.ts @@ -34,6 +34,17 @@ export interface IBatch { * The reference sequence number for the batch */ readonly referenceSequenceNumber: number | undefined; + /** + * Wether or not the batch contains at least one op which was produced as the result + * of processing another op. This means that the batch must be rebased before + * submitted, to ensure that all ops have the same reference sequence numbers and a + * consistent view of the data model. This happens when the op is created within a + * 'changed' event handler of a DDS and will have a different reference sequence number + * than the rest of the ops in the batch, meaning that it has a different view of the + * state of the data model, therefore all ops must be resubmitted and rebased to the current + * reference sequence number to be in agreement about the data model state. + */ + readonly hasReentrantOps?: boolean; } export interface IBatchCheckpoint { diff --git a/packages/runtime/container-runtime/src/opLifecycle/index.ts b/packages/runtime/container-runtime/src/opLifecycle/index.ts index 6aa0a69aa7e8..4e7830db881a 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/index.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/index.ts @@ -11,7 +11,7 @@ export { IChunkedOp, IMessageProcessingResult, } from "./definitions"; -export { Outbox } from "./outbox"; +export { Outbox, getLongStack } from "./outbox"; export { OpCompressor } from "./opCompressor"; export { OpDecompressor } from "./opDecompressor"; export { OpSplitter, splitOp } from "./opSplitter"; diff --git a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts index 9ec84fa8072e..7e1fd8d655e8 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts @@ -10,11 +10,11 @@ import { MonitoringContext, } from "@fluidframework/telemetry-utils"; import { assert } from "@fluidframework/common-utils"; -import { IContainerContext } from "@fluidframework/container-definitions"; +import { IContainerContext, ICriticalContainerError } from "@fluidframework/container-definitions"; import { GenericError, UsageError } from "@fluidframework/container-utils"; import { MessageType } from "@fluidframework/protocol-definitions"; import { ICompressionRuntimeOptions } from "../containerRuntime"; -import { PendingStateManager } from "../pendingStateManager"; +import { IPendingBatchMessage, PendingStateManager } from "../pendingStateManager"; import { BatchManager, BatchSequenceNumbers, @@ -31,6 +31,7 @@ export interface IOutboxConfig { // The maximum size of a batch that we can send over the wire. readonly maxBatchSizeInBytes: number; readonly disablePartialFlush: boolean; + readonly enableBatchRebasing: boolean; } export interface IOutboxParameters { @@ -43,18 +44,40 @@ export interface IOutboxParameters { readonly logger: ITelemetryLoggerExt; readonly groupingManager: OpGroupingManager; readonly getCurrentSequenceNumbers: () => BatchSequenceNumbers; + readonly reSubmit: (message: IPendingBatchMessage) => void; + readonly opReentrancy: () => boolean; + readonly closeContainer: (error?: ICriticalContainerError) => void; } -function getLongStack(action: () => Error): Error { - // Increase the stack trace limit temporarily, so as to debug better in case it occurs. +/** + * Temporarily increase the stack limit while executing the provided action. + * If a negative value is provided for `length`, no stack frames will be collected. + * If Infinity is provided, all frames will be collected. + * + * ADO:4663 - add this to the common packages. + * + * @param action - action which returns an error + * @param length - number of stack frames to collect, 50 if unspecified. + * @returns the result of the action provided + */ +export function getLongStack(action: () => T, length: number = 50): T { + const errorObj = Error as any; + if ( + ( + Object.getOwnPropertyDescriptor(errorObj, "stackTraceLimit") || + Object.getOwnPropertyDescriptor(Object.getPrototypeOf(errorObj), "stackTraceLimit") || + {} + ).writable !== true + ) { + return action(); + } + + const originalStackTraceLimit = errorObj.stackTraceLimit; try { - const originalStackTraceLimit = (Error as any).stackTraceLimit; - (Error as any).stackTraceLimit = 50; - const result = action(); - (Error as any).stackTraceLimit = originalStackTraceLimit; - return result; - } catch (error) { + errorObj.stackTraceLimit = length; return action(); + } finally { + errorObj.stackTraceLimit = originalStackTraceLimit; } } @@ -63,6 +86,8 @@ export class Outbox { private readonly attachFlowBatch: BatchManager; private readonly mainBatch: BatchManager; private readonly defaultAttachFlowSoftLimitInBytes = 320 * 1024; + private batchRebasesToReport = 5; + private rebasing = false; /** * Track the number of ops which were detected to have a mismatched @@ -132,7 +157,7 @@ export class Outbox { } if (!this.params.config.disablePartialFlush) { - this.flush(); + this.flushAll(); } } @@ -142,6 +167,7 @@ export class Outbox { if ( !this.mainBatch.push( message, + this.isContextReentrant(), this.params.getCurrentSequenceNumbers().clientSequenceNumber, ) ) { @@ -160,16 +186,18 @@ export class Outbox { if ( !this.attachFlowBatch.push( message, + this.isContextReentrant(), this.params.getCurrentSequenceNumbers().clientSequenceNumber, ) ) { // BatchManager has two limits - soft limit & hard limit. Soft limit is only engaged // when queue is not empty. // Flush queue & retry. Failure on retry would mean - single message is bigger than hard limit - this.flushInternal(this.attachFlowBatch.popBatch()); + this.flushInternal(this.attachFlowBatch); if ( !this.attachFlowBatch.push( message, + this.isContextReentrant(), this.params.getCurrentSequenceNumbers().clientSequenceNumber, ) ) { @@ -191,22 +219,85 @@ export class Outbox { this.attachFlowBatch.contentSizeInBytes >= this.params.config.compressionOptions.minimumBatchSizeInBytes ) { - this.flushInternal(this.attachFlowBatch.popBatch()); + this.flushInternal(this.attachFlowBatch); } } public flush() { - this.flushInternal(this.attachFlowBatch.popBatch()); - this.flushInternal(this.mainBatch.popBatch()); + if (this.isContextReentrant()) { + const error = new UsageError("Flushing is not supported inside DDS event handlers"); + this.params.closeContainer(error); + throw error; + } + + this.flushAll(); + } + + private flushAll() { + this.flushInternal(this.attachFlowBatch); + this.flushInternal(this.mainBatch); } - private flushInternal(rawBatch: IBatch) { + private flushInternal(batchManager: BatchManager) { + if (batchManager.empty) { + return; + } + + const rawBatch = batchManager.popBatch(); + if (rawBatch.hasReentrantOps === true && this.params.config.enableBatchRebasing) { + assert(!this.rebasing, "A rebased batch should never have reentrant ops"); + // If a batch contains reentrant ops (ops created as a result from processing another op) + // it needs to be rebased so that we can ensure consistent reference sequence numbers + // and eventual consistency at the DDS level. + this.rebase(rawBatch, batchManager); + return; + } + const processedBatch = this.compressBatch(rawBatch); this.sendBatch(processedBatch); this.persistBatch(rawBatch.content); } + /** + * Rebases a batch. All the ops in the batch are resubmitted to the runtime and + * they will end up back in the same batch manager they were flushed from and subsequently flushed. + * + * @param rawBatch - the batch to be rebased + */ + private rebase(rawBatch: IBatch, batchManager: BatchManager) { + assert(!this.rebasing, "Reentrancy"); + + this.rebasing = true; + for (const message of rawBatch.content) { + this.params.reSubmit({ + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + content: message.contents!, + localOpMetadata: message.localOpMetadata, + opMetadata: message.metadata, + }); + } + + if (this.batchRebasesToReport > 0) { + this.mc.logger.sendTelemetryEvent( + { + eventName: "BatchRebase", + length: rawBatch.content.length, + referenceSequenceNumber: rawBatch.referenceSequenceNumber, + }, + new UsageError("BatchRebase"), + ); + this.batchRebasesToReport--; + } + + this.flushInternal(batchManager); + this.rebasing = false; + } + + private isContextReentrant(): boolean { + return this.params.opReentrancy() && !this.rebasing; + } + private compressBatch(batch: IBatch): IBatch { if ( batch.content.length === 0 || diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index dfb439a43057..85ff87b73d33 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -431,6 +431,40 @@ describe("Runtime", () => { })), ); }); + + it("Can't call flush() inside ensureNoDataModelChanges's callback", async () => { + containerRuntime = await ContainerRuntime.load( + getMockContext() as IContainerContext, + [], + undefined, // requestHandler + { + flushMode: FlushMode.Immediate, + }, // runtimeOptions + ); + + assert.throws(() => + containerRuntime.ensureNoDataModelChanges(() => { + containerRuntime.orderSequentially(() => {}); + }), + ); + }); + + it("Can't create an infinite ensureNoDataModelChanges recursive call ", async () => { + containerRuntime = await ContainerRuntime.load( + getMockContext() as IContainerContext, + [], + undefined, // requestHandler + {}, // runtimeOptions + ); + + const callback = () => { + containerRuntime.ensureNoDataModelChanges(() => { + containerRuntime.submitDataStoreOp("id", "test"); + callback(); + }); + }; + assert.throws(() => callback()); + }); }); describe("orderSequentially with rollback", () => @@ -1148,6 +1182,7 @@ describe("Runtime", () => { gcAllowed: true, }, flushMode: FlushModeExperimental.Async as unknown as FlushMode, + enableBatchRebasing: true, }; const defaultRuntimeOptions = { @@ -1164,6 +1199,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: false, enableOpReentryCheck: false, enableGroupedBatching: false, + enableBatchRebasing: false, }; const mergedRuntimeOptions = { ...defaultRuntimeOptions, ...runtimeOptions }; @@ -1180,7 +1216,10 @@ describe("Runtime", () => { eventName: "ContainerLoadStats", category: "generic", options: JSON.stringify(mergedRuntimeOptions), - featureGates: JSON.stringify({ idCompressorEnabled: false }), + featureGates: JSON.stringify({ + idCompressorEnabled: false, + enableBatchRebasing: true, + }), }, ]); }); @@ -1191,6 +1230,7 @@ describe("Runtime", () => { "Fluid.ContainerRuntime.CompressionChunkingDisabled": true, "Fluid.ContainerRuntime.DisableOpReentryCheck": false, "Fluid.ContainerRuntime.IdCompressorEnabled": true, + "Fluid.ContainerRuntime.DisableBatchRebasing": true, }; await ContainerRuntime.loadRuntime({ context: localGetMockContext(featureGates) as IContainerContext, @@ -1209,6 +1249,7 @@ describe("Runtime", () => { disableOpReentryCheck: false, disableChunking: true, idCompressorEnabled: true, + enableBatchRebasing: false, }), }, ]); diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts index 4a38dfe5eb23..ac5802032f4f 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts @@ -26,15 +26,15 @@ describe("BatchManager", () => { const batchManager = new BatchManager({ hardLimit, softLimit }); // Can push one large message - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, 1); // Can't push another large message - assert.equal(batchManager.push(message), false); + assert.equal(batchManager.push(message, /* reentrant */ false), false); assert.equal(batchManager.length, 1); // But can push one small message - assert.equal(batchManager.push(smallMessage()), true); + assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true); assert.equal(batchManager.length, 2); // Pop and check batch @@ -43,10 +43,10 @@ describe("BatchManager", () => { assert.equal(batch.contentSizeInBytes, softLimit / 2 + smallMessageSize); // Validate that can push large message again - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, 1); - assert.equal(batchManager.push(message), false); + assert.equal(batchManager.push(message, /* reentrant */ false), false); assert.equal(batchManager.length, 1); }); @@ -55,11 +55,11 @@ describe("BatchManager", () => { const batchManager = new BatchManager({ hardLimit, softLimit }); // Can push one large message, even above soft limit - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, 1); // Can't push another small message - assert.equal(batchManager.push(smallMessage()), false); + assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), false); assert.equal(batchManager.length, 1); // Pop and check batch @@ -68,10 +68,10 @@ describe("BatchManager", () => { assert.equal(batch.contentSizeInBytes, softLimit * 2); // Validate that we can't push large message above soft limit if we have already at least one message. - assert.equal(batchManager.push(smallMessage()), true); + assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true); assert.equal(batchManager.length, 1); - assert.equal(batchManager.push(message), false); + assert.equal(batchManager.push(message, /* reentrant */ false), false); assert.equal(batchManager.length, 1); }); @@ -81,15 +81,15 @@ describe("BatchManager", () => { const message = { contents: generateStringOfSize(third) } as any as BatchMessage; // Can push one large message, even above soft limit - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, 1); // Can push second large message, even above soft limit - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, 2); // Can't push another message - assert.equal(batchManager.push(message), false); + assert.equal(batchManager.push(message, /* reentrant */ false), false); assert.equal(batchManager.length, 2); // Pop and check batch @@ -97,13 +97,13 @@ describe("BatchManager", () => { assert.equal(batch.content.length, 2); // Can push messages again - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, 1); - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, 2); - assert.equal(batchManager.push(smallMessage()), true); + assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true); assert.equal(batchManager.length, 3); }); @@ -116,15 +116,15 @@ describe("BatchManager", () => { } as any as BatchMessage; // Can't push very large message, above hard limit - assert.equal(batchManager.push(largeMessage), false); + assert.equal(batchManager.push(largeMessage, /* reentrant */ false), false); assert.equal(batchManager.length, 0); // Can push one message - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, 1); // Can't push second message - assert.equal(batchManager.push(message), false); + assert.equal(batchManager.push(message, /* reentrant */ false), false); assert.equal(batchManager.length, 1); // Pop and check batch @@ -137,68 +137,173 @@ describe("BatchManager", () => { const batchManager = new BatchManager({ hardLimit: Infinity }); for (let i = 1; i <= 10; i++) { - assert.equal(batchManager.push(message), true); + assert.equal(batchManager.push(message, /* reentrant */ false), true); assert.equal(batchManager.length, i); } }); it("Batch metadata is set correctly", () => { const batchManager = new BatchManager({ hardLimit }); - assert.equal(batchManager.push({ ...smallMessage(), referenceSequenceNumber: 0 }), true); - assert.equal(batchManager.push({ ...smallMessage(), referenceSequenceNumber: 1 }), true); - assert.equal(batchManager.push({ ...smallMessage(), referenceSequenceNumber: 2 }), true); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 0 }, + /* reentrant */ false, + ), + true, + ); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 1 }, + /* reentrant */ false, + ), + true, + ); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 2 }, + /* reentrant */ false, + ), + true, + ); const batch = batchManager.popBatch(); assert.equal(batch.content[0].metadata?.batch, true); assert.equal(batch.content[1].metadata?.batch, undefined); assert.equal(batch.content[2].metadata?.batch, false); - assert.equal(batchManager.push({ ...smallMessage(), referenceSequenceNumber: 0 }), true); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 0 }, + /* reentrant */ false, + ), + true, + ); const singleOpBatch = batchManager.popBatch(); assert.equal(singleOpBatch.content[0].metadata?.batch, undefined); }); it("Batch content size is tracked correctly", () => { const batchManager = new BatchManager({ hardLimit }); - assert.equal(batchManager.push(smallMessage()), true); + assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true); assert.equal(batchManager.contentSizeInBytes, smallMessageSize * batchManager.length); - assert.equal(batchManager.push(smallMessage()), true); + assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true); assert.equal(batchManager.contentSizeInBytes, smallMessageSize * batchManager.length); - assert.equal(batchManager.push(smallMessage()), true); + assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true); assert.equal(batchManager.contentSizeInBytes, smallMessageSize * batchManager.length); }); it("Batch reference sequence number maps to the last message", () => { const batchManager = new BatchManager({ hardLimit }); - assert.equal(batchManager.push({ ...smallMessage(), referenceSequenceNumber: 0 }), true); - assert.equal(batchManager.push({ ...smallMessage(), referenceSequenceNumber: 1 }), true); - assert.equal(batchManager.push({ ...smallMessage(), referenceSequenceNumber: 2 }), true); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 0 }, + /* reentrant */ false, + ), + true, + ); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 1 }, + /* reentrant */ false, + ), + true, + ); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 2 }, + /* reentrant */ false, + ), + true, + ); assert.equal(batchManager.sequenceNumbers.referenceSequenceNumber, 2); }); it("Batch size estimates", () => { const batchManager = new BatchManager({ hardLimit }); - batchManager.push(smallMessage()); + batchManager.push(smallMessage(), /* reentrant */ false); // 10 bytes of content + 200 bytes overhead assert.equal(estimateSocketSize(batchManager.popBatch()), 210); for (let i = 0; i < 10; i++) { - batchManager.push(smallMessage()); + batchManager.push(smallMessage(), /* reentrant */ false); } // (10 bytes of content + 200 bytes overhead) x 10 assert.equal(estimateSocketSize(batchManager.popBatch()), 2100); - batchManager.push(smallMessage()); + batchManager.push(smallMessage(), /* reentrant */ false); for (let i = 0; i < 9; i++) { - batchManager.push({ - contents: undefined, - type: ContainerMessageType.FluidDataStoreOp, - } as any as BatchMessage); // empty op + batchManager.push( + { + contents: undefined, + type: ContainerMessageType.FluidDataStoreOp, + } as any as BatchMessage, + /* reentrant */ false, + ); // empty op } // 10 bytes of content + 200 bytes overhead x 10 assert.equal(estimateSocketSize(batchManager.popBatch()), 2010); }); + + it("Batch op reentry state preserved during its lifetime", () => { + const batchManager = new BatchManager({ hardLimit }); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 0 }, + /* reentrant */ false, + ), + true, + ); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 1 }, + /* reentrant */ false, + ), + true, + ); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 2 }, + /* reentrant */ false, + ), + true, + ); + + assert.equal(batchManager.popBatch().hasReentrantOps, false); + + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 0 }, + /* reentrant */ false, + ), + true, + ); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 1 }, + /* reentrant */ true, + /* currentClientSequenceNumber */ undefined, + ), + true, + ); + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 2 }, + /* reentrant */ false, + ), + true, + ); + assert.equal(batchManager.popBatch().hasReentrantOps, true); + + assert.equal( + batchManager.push( + { ...smallMessage(), referenceSequenceNumber: 0 }, + /* reentrant */ false, + ), + true, + ); + assert.equal(batchManager.popBatch().hasReentrantOps, false); + }); }); diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts index 5c4e53c76418..15c88e0cc8e3 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts @@ -7,6 +7,7 @@ import { strict as assert } from "assert"; import { IBatchMessage, IContainerContext, + ICriticalContainerError, IDeltaManager, } from "@fluidframework/container-definitions"; import { @@ -15,7 +16,7 @@ import { MessageType, } from "@fluidframework/protocol-definitions"; import { MockLogger } from "@fluidframework/telemetry-utils"; -import { PendingStateManager } from "../../pendingStateManager"; +import { IPendingBatchMessage, PendingStateManager } from "../../pendingStateManager"; import { BatchMessage, IBatch, @@ -124,15 +125,13 @@ describe("Outbox", () => { }, }); - const createMessage = (type: ContainerMessageType, contents: string): BatchMessage => { - return { - contents: JSON.stringify({ type, contents }), - type, - metadata: { test: true }, - localOpMetadata: {}, - referenceSequenceNumber: Number.POSITIVE_INFINITY, - }; - }; + const createMessage = (type: ContainerMessageType, contents: string): BatchMessage => ({ + contents: JSON.stringify({ type, contents }), + type, + metadata: { test: true }, + localOpMetadata: {}, + referenceSequenceNumber: Number.POSITIVE_INFINITY, + }); const batchedMessage = ( message: BatchMessage, @@ -168,6 +167,7 @@ describe("Outbox", () => { .reduce((a, b) => a + b, 0), referenceSequenceNumber: messages.length === 0 ? undefined : messages[0].referenceSequenceNumber, + hasReentrantOps: false, }); const DefaultCompressionOptions = { @@ -184,6 +184,7 @@ describe("Outbox", () => { enableChunking?: boolean; disablePartialFlush?: boolean; chunkSizeInBytes?: number; + enableBatchRebasing?: boolean; }) => new Outbox({ shouldSend: () => state.canSendOps, @@ -198,10 +199,14 @@ describe("Outbox", () => { maxBatchSizeInBytes: params.maxBatchSize ?? maxBatchSizeInBytes, compressionOptions: params.compressionOptions ?? DefaultCompressionOptions, disablePartialFlush: params.disablePartialFlush ?? false, + enableBatchRebasing: params.enableBatchRebasing ?? false, }, logger: mockLogger, groupingManager: new OpGroupingManager(false), getCurrentSequenceNumbers: () => currentSeqNumbers, + reSubmit: (message: IPendingBatchMessage) => {}, + opReentrancy: () => false, + closeContainer: (error?: ICriticalContainerError) => {}, }); beforeEach(() => { diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index dd146dbe5d28..c9f839170d6c 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -46,7 +46,7 @@ describe("Pending State Manager", () => { }); it("should do nothing when rolling back nothing", () => { - batchManager.push(getMessage("1")); + batchManager.push(getMessage("1"), /* reentrant */ false); const checkpoint = batchManager.checkpoint(); checkpoint.rollback(rollBackCallback); @@ -56,9 +56,9 @@ describe("Pending State Manager", () => { it("should succeed when rolling back entire pending stack", () => { const checkpoint = batchManager.checkpoint(); - batchManager.push(getMessage("11")); - batchManager.push(getMessage("22")); - batchManager.push(getMessage("33")); + batchManager.push(getMessage("11"), /* reentrant */ false); + batchManager.push(getMessage("22"), /* reentrant */ false); + batchManager.push(getMessage("33"), /* reentrant */ false); checkpoint.rollback(rollBackCallback); assert.strictEqual(rollbackCalled, true); @@ -70,10 +70,10 @@ describe("Pending State Manager", () => { }); it("should succeed when rolling back part of pending stack", () => { - batchManager.push(getMessage("11")); + batchManager.push(getMessage("11"), /* reentrant */ false); const checkpoint = batchManager.checkpoint(); - batchManager.push(getMessage("22")); - batchManager.push(getMessage("33")); + batchManager.push(getMessage("22"), /* reentrant */ false); + batchManager.push(getMessage("33"), /* reentrant */ false); checkpoint.rollback(rollBackCallback); assert.strictEqual(rollbackCalled, true); @@ -86,7 +86,7 @@ describe("Pending State Manager", () => { it("should throw and close when rollback fails", () => { rollbackShouldThrow = true; const checkpoint = batchManager.checkpoint(); - batchManager.push(getMessage("11")); + batchManager.push(getMessage("11"), /* reentrant */ false); assert.throws(() => { checkpoint.rollback(rollBackCallback); }); diff --git a/packages/test/test-end-to-end-tests/src/test/opReentrancy.spec.ts b/packages/test/test-end-to-end-tests/src/test/opReentrancy.spec.ts index 3672d5523103..b49f8bc10591 100644 --- a/packages/test/test-end-to-end-tests/src/test/opReentrancy.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/opReentrancy.spec.ts @@ -16,11 +16,18 @@ import { ITestObjectProvider, } from "@fluidframework/test-utils"; import { describeNoCompat, itExpects } from "@fluid-internal/test-version-utils"; +import { SharedString } from "@fluidframework/sequence"; import { IContainer } from "@fluidframework/container-definitions"; +import { IMergeTreeInsertMsg } from "@fluidframework/merge-tree"; +import { FlushMode } from "@fluidframework/runtime-definitions"; describeNoCompat("Concurrent op processing via DDS event handlers", (getTestObjectProvider) => { const mapId = "mapKey"; - const registry: ChannelFactoryRegistry = [[mapId, SharedMap.getFactory()]]; + const sharedStringId = "sharedStringKey"; + const registry: ChannelFactoryRegistry = [ + [mapId, SharedMap.getFactory()], + [sharedStringId, SharedString.getFactory()], + ]; const testContainerConfig: ITestContainerConfig = { fluidDataObjectType: DataObjectFactoryType.Test, registry, @@ -32,11 +39,16 @@ describeNoCompat("Concurrent op processing via DDS event handlers", (getTestObje let dataObject2: ITestFluidObject; let sharedMap1: SharedMap; let sharedMap2: SharedMap; + let sharedString1: SharedString; + let sharedString2: SharedString; const configProvider = (settings: Record): IConfigProviderBase => ({ getRawConfig: (name: string): ConfigTypes => settings[name], }); + const mapsAreEqual = (a: SharedMap, b: SharedMap) => + a.size === b.size && [...a.entries()].every(([key, value]) => b.get(key) === value); + beforeEach(async () => { provider = getTestObjectProvider(); }); @@ -60,6 +72,9 @@ describeNoCompat("Concurrent op processing via DDS event handlers", (getTestObje sharedMap1 = await dataObject1.getSharedObject(mapId); sharedMap2 = await dataObject2.getSharedObject(mapId); + sharedString1 = await dataObject1.getSharedObject(sharedStringId); + sharedString2 = await dataObject2.getSharedObject(sharedStringId); + await provider.ensureSynchronized(); }; @@ -98,9 +113,159 @@ describeNoCompat("Concurrent op processing via DDS event handlers", (getTestObje // The other container is fine assert.equal(sharedMap2.get("key1"), undefined); assert.equal(sharedMap2.get("key2"), "2"); + assert.ok(!mapsAreEqual(sharedMap1, sharedMap2)); }, ); + [false, true].forEach((enableGroupedBatching) => { + // ADO:4537 Enable only after rebasing is supported by the DDS + it.skip(`Eventual consistency with op reentry - ${ + enableGroupedBatching ? "Grouped" : "Regular" + } batches`, async () => { + await setupContainers({ + ...testContainerConfig, + runtimeOptions: { + enableGroupedBatching, + enableBatchRebasing: true, + }, + }); + + sharedString1.insertText(0, "ad"); + sharedString1.insertText(1, "c"); + await provider.ensureSynchronized(); + + sharedString2.on("sequenceDelta", (sequenceDeltaEvent) => { + if ((sequenceDeltaEvent.opArgs.op as IMergeTreeInsertMsg).seg === "b") { + sharedString2.insertText(3, "x"); + } + }); + sharedMap2.on("valueChanged", (changed1) => { + if (changed1.key !== "key2" && changed1.key !== "key3") { + sharedMap2.on("valueChanged", (changed2) => { + if (changed2.key !== "key3") { + sharedMap2.set("key3", `${sharedMap1.get("key1")} updated`); + } + }); + + sharedMap2.set("key2", "3"); + } + }); + + sharedMap1.set("key1", "1"); + + sharedString1.insertText(1, "b"); + sharedString2.insertText(0, "y"); + await provider.ensureSynchronized(); + + // The offending container is still alive + sharedString2.insertText(0, "z"); + await provider.ensureSynchronized(); + + assert.strictEqual(sharedString1.getText(), "zyabxcd"); + assert.strictEqual( + sharedString1.getText(), + sharedString2.getText(), + "SharedString eventual consistency broken", + ); + + assert.strictEqual(sharedMap1.get("key1"), "1"); + assert.strictEqual(sharedMap1.get("key2"), "3"); + assert.strictEqual(sharedMap1.get("key3"), "1 updated"); + assert.ok( + mapsAreEqual(sharedMap1, sharedMap2), + "SharedMap eventual consistency broken", + ); + + // Both containers are alive at the end + assert.ok(!container1.closed, "Local container is closed"); + assert.ok(!container2.closed, "Remote container is closed"); + }); + }); + + it("Eventual consistency broken with op reentry, grouped batches and batch rebasing disabled", async () => { + await setupContainers( + { + ...testContainerConfig, + runtimeOptions: { + enableGroupedBatching: true, + enableBatchRebasing: true, + }, + }, + { "Fluid.ContainerRuntime.DisableBatchRebasing": true }, + ); + + sharedString1.insertText(0, "ad"); + await provider.ensureSynchronized(); + + sharedString2.on("sequenceDelta", (sequenceDeltaEvent) => { + if ((sequenceDeltaEvent.opArgs.op as IMergeTreeInsertMsg).seg === "b") { + sharedString2.insertText(3, "x"); + } + }); + + sharedString1.insertText(1, "b"); + sharedString1.insertText(2, "c"); + await provider.ensureSynchronized(); + + assert.notStrictEqual( + sharedString1.getText(), + sharedString2.getText(), + "Unexpected eventual consistency", + ); + }); + + describe("Reentry safeguards", () => { + itExpects( + "Flushing is not supported", + [ + { + eventName: "fluid:telemetry:Container:ContainerClose", + error: "Flushing is not supported inside DDS event handlers", + }, + ], + async () => { + await setupContainers({ + ...testContainerConfig, + runtimeOptions: { + flushMode: FlushMode.Immediate, + }, + }); + + sharedString1.on("sequenceDelta", () => + assert.throws(() => + dataObject1.context.containerRuntime.orderSequentially(() => + sharedMap1.set("0", 0), + ), + ), + ); + + assert.throws(() => sharedString1.insertText(0, "ad")); + await provider.ensureSynchronized(); + }, + ); + + it("Flushing is supported if it happens in the next batch", async () => { + await setupContainers({ + ...testContainerConfig, + runtimeOptions: { + flushMode: FlushMode.Immediate, + }, + }); + + sharedString1.on("sequenceDelta", (sequenceDeltaEvent) => { + if ((sequenceDeltaEvent.opArgs.op as IMergeTreeInsertMsg).seg === "ad") { + void Promise.resolve().then(() => { + sharedString1.insertText(0, "bc"); + }); + } + }); + + sharedString1.insertText(0, "ad"); + await provider.ensureSynchronized(); + assert.strictEqual(sharedString1.getText(), "bcad"); + }); + }); + it("Should throw when submitting an op while handling an event - offline", async () => { await setupContainers({ ...testContainerConfig, @@ -125,30 +290,31 @@ describeNoCompat("Concurrent op processing via DDS event handlers", (getTestObje container1.deltaManager.inbound.resume(); container1.deltaManager.outbound.resume(); + await provider.ensureSynchronized(); + // The offending container is not closed assert.ok(!container1.closed); + assert.ok(!mapsAreEqual(sharedMap1, sharedMap2)); }); - const allowReentry = [ - { - options: testContainerConfig, - featureGates: {}, - name: "Default config and feature gates", - }, - { - options: { - ...testContainerConfig, - runtimeOptions: { - enableOpReentryCheck: true, + describe("Allow reentry", () => + [ + { + options: testContainerConfig, + featureGates: {}, + name: "Default config and feature gates", + }, + { + options: { + ...testContainerConfig, + runtimeOptions: { + enableOpReentryCheck: true, + }, }, + featureGates: { "Fluid.ContainerRuntime.DisableOpReentryCheck": true }, + name: "Enabled by options, disabled by feature gate", }, - featureGates: { "Fluid.ContainerRuntime.DisableOpReentryCheck": true }, - name: "Enabled by options, disabled by feature gate", - }, - ]; - - describe("Allow reentry", () => - allowReentry.forEach((testConfig) => { + ].forEach((testConfig) => { it(`Should not close the container when submitting an op while processing a batch [${testConfig.name}]`, async () => { await setupContainers(testConfig.options, testConfig.featureGates); @@ -177,6 +343,7 @@ describeNoCompat("Concurrent op processing via DDS event handlers", (getTestObje // The second event handler didn't receive the events in the actual order of changes assert.deepEqual(outOfOrderObservations, ["key2", "key1"]); + assert.ok(mapsAreEqual(sharedMap1, sharedMap2)); }); it(`Should not throw when submitting an op while processing a batch - offline [${testConfig.name}]`, async () => { @@ -208,12 +375,14 @@ describeNoCompat("Concurrent op processing via DDS event handlers", (getTestObje container1.deltaManager.inbound.resume(); container1.deltaManager.outbound.resume(); + await provider.ensureSynchronized(); // The offending container is not closed assert.ok(!container1.closed); // The second event handler didn't receive the events in the actual order of changes assert.deepEqual(outOfOrderObservations, ["key2", "key1"]); + assert.ok(mapsAreEqual(sharedMap1, sharedMap2)); }); })); }); diff --git a/packages/test/test-service-load/src/optionsMatrix.ts b/packages/test/test-service-load/src/optionsMatrix.ts index d05724351f12..572625745382 100644 --- a/packages/test/test-service-load/src/optionsMatrix.ts +++ b/packages/test/test-service-load/src/optionsMatrix.ts @@ -100,6 +100,7 @@ export function generateRuntimeOptions( chunkSizeInBytes: [204800], enableRuntimeIdCompressor: [undefined, true], enableGroupedBatching: [true, false], + enableBatchRebasing: [true], }; return generatePairwiseOptions(