diff --git a/api-report/test-runtime-utils.api.md b/api-report/test-runtime-utils.api.md index ca89318cf1e5..92c0acc74aaf 100644 --- a/api-report/test-runtime-utils.api.md +++ b/api-report/test-runtime-utils.api.md @@ -115,6 +115,8 @@ export class MockContainerRuntime { // (undocumented) process(message: ISequencedDocumentMessage): void; // (undocumented) + protected get referenceSequenceNumber(): number; + // (undocumented) submit(messageContent: any, localOpMetadata: unknown): number; } @@ -142,6 +144,20 @@ export class MockContainerRuntimeFactory { sequenceNumber: number; } +// @public +export class MockContainerRuntimeFactoryForRebasing extends MockContainerRuntimeFactory { + // (undocumented) + createContainerRuntime(dataStoreRuntime: MockFluidDataStoreRuntime, overrides?: { + minimumSequenceNumber?: number; + }): MockContainerRuntimeForRebasing; + // (undocumented) + processAllMessages(): void; + // (undocumented) + processOneMessage(): void; + // (undocumented) + processSomeMessages(count: number): void; +} + // @public export class MockContainerRuntimeFactoryForReconnection extends MockContainerRuntimeFactory { // (undocumented) @@ -152,6 +168,21 @@ export class MockContainerRuntimeFactoryForReconnection extends MockContainerRun }): MockContainerRuntimeForReconnection; } +// @public +export class MockContainerRuntimeForRebasing extends MockContainerRuntime { + constructor(dataStoreRuntime: MockFluidDataStoreRuntime, factory: MockContainerRuntimeFactoryForRebasing, overrides?: { + minimumSequenceNumber?: number; + }); + // (undocumented) + flush(): void; + // (undocumented) + process(message: ISequencedDocumentMessage): void; + // (undocumented) + rebase(): void; + // (undocumented) + submit(messageContent: any, localOpMetadata: unknown): number; +} + // @public export class MockContainerRuntimeForReconnection extends MockContainerRuntime { constructor(dataStoreRuntime: MockFluidDataStoreRuntime, factory: MockContainerRuntimeFactoryForReconnection, overrides?: { @@ -391,8 +422,12 @@ export class MockFluidDataStoreRuntime extends EventEmitter implements IFluidDat // (undocumented) readonly connected = true; // (undocumented) + containerRuntime?: MockContainerRuntime; + // (undocumented) createChannel(id: string, type: string): IChannel; // (undocumented) + createDeltaConnection?(): MockDeltaConnection; + // (undocumented) deltaManager: MockDeltaManager; // (undocumented) dispose(): void; @@ -444,7 +479,7 @@ export class MockFluidDataStoreRuntime extends EventEmitter implements IFluidDat // (undocumented) readonly path = ""; // (undocumented) - process(message: ISequencedDocumentMessage, local: boolean): void; + process(message: ISequencedDocumentMessage, local: boolean, localOpMetadata: unknown): void; // (undocumented) processSignal(message: any, local: boolean): void; // (undocumented) diff --git a/packages/dds/map/src/test/mocha/rebasing.spec.ts b/packages/dds/map/src/test/mocha/rebasing.spec.ts new file mode 100644 index 000000000000..3b124a6ef79b --- /dev/null +++ b/packages/dds/map/src/test/mocha/rebasing.spec.ts @@ -0,0 +1,186 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; +import { + MockFluidDataStoreRuntime, + MockContainerRuntimeFactoryForRebasing, + MockContainerRuntimeForRebasing, + MockStorage, +} from "@fluidframework/test-runtime-utils"; +import { MapFactory, SharedMap } from "../../map"; +import { DirectoryFactory, SharedDirectory } from "../../directory"; +import { IDirectory } from "../../interfaces"; + +describe("Rebasing", () => { + let containerRuntimeFactory: MockContainerRuntimeFactoryForRebasing; + let containerRuntime1: MockContainerRuntimeForRebasing; + let containerRuntime2: MockContainerRuntimeForRebasing; + + describe("SharedMap", () => { + let map1: SharedMap; + let map2: SharedMap; + + beforeEach(async () => { + containerRuntimeFactory = new MockContainerRuntimeFactoryForRebasing(); + const dataStoreRuntime1 = new MockFluidDataStoreRuntime(); + containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1); + const services1 = { + deltaConnection: containerRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }; + map1 = new SharedMap("shared-map-1", dataStoreRuntime1, MapFactory.Attributes); + map1.connect(services1); + + const dataStoreRuntime2 = new MockFluidDataStoreRuntime(); + containerRuntime2 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime2); + const services2 = { + deltaConnection: containerRuntime2.createDeltaConnection(), + objectStorage: new MockStorage(), + }; + map2 = new SharedMap("shared-map-2", dataStoreRuntime2, MapFactory.Attributes); + map2.connect(services2); + }); + + it("Rebasing ops maintains eventual consistency", () => { + const keyCount = 10; + for (let i = 0; i < keyCount; i++) { + map1.set(`${i}`, map1.size); + } + + containerRuntime1.rebase(); + containerRuntime1.flush(); + containerRuntimeFactory.processAllMessages(); + + for (let i = 0; i < keyCount; i++) { + assert.strictEqual(map1.get(`${i}`), i); + assert.strictEqual(map2.get(`${i}`), i); + } + + const deleteThreshold = 5; + for (let i = 0; i < deleteThreshold - 1; i++) { + map2.delete(`${i}`); + } + + map1.delete(`${deleteThreshold - 1}`); + + containerRuntime2.rebase(); + containerRuntime1.flush(); + containerRuntime2.flush(); + containerRuntimeFactory.processAllMessages(); + + for (let i = 0; i < 10; i++) { + const expected = i < deleteThreshold ? undefined : i; + assert.strictEqual(map1.get(`${i}`), expected); + assert.strictEqual(map2.get(`${i}`), expected); + } + }); + }); + + describe("SharedDirectory", () => { + let dir1: SharedDirectory; + let dir2: SharedDirectory; + + beforeEach(async () => { + containerRuntimeFactory = new MockContainerRuntimeFactoryForRebasing(); + const dataStoreRuntime1 = new MockFluidDataStoreRuntime(); + containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1); + const services1 = { + deltaConnection: containerRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }; + dir1 = new SharedDirectory( + "shared-directory-1", + dataStoreRuntime1, + DirectoryFactory.Attributes, + ); + dir1.connect(services1); + + // Create the second SharedMap. + const dataStoreRuntime2 = new MockFluidDataStoreRuntime(); + containerRuntime2 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime2); + const services2 = { + deltaConnection: containerRuntime2.createDeltaConnection(), + objectStorage: new MockStorage(), + }; + dir2 = new SharedDirectory( + "shared-directory-2", + dataStoreRuntime2, + DirectoryFactory.Attributes, + ); + dir2.connect(services2); + }); + + const areDirectoriesEqual = (a: IDirectory | undefined, b: IDirectory | undefined) => { + if (a === undefined || b === undefined) { + assert.strictEqual(a, b, "Both directories should be undefined"); + return; + } + + const leftKeys = Array.from(a.keys()); + const rightKeys = Array.from(b.keys()); + assert.strictEqual( + leftKeys.length, + rightKeys.length, + "Number of keys should be the same", + ); + leftKeys.forEach((key) => { + assert.strictEqual(a.get(key), b.get(key), "Key values should be the same"); + }); + + const leftSubdirectories = Array.from(a.subdirectories()); + const rightSubdirectories = Array.from(b.subdirectories()); + assert.strictEqual( + leftSubdirectories.length, + rightSubdirectories.length, + "Number of subdirectories should be the same", + ); + + leftSubdirectories.forEach(([name]) => + areDirectoriesEqual(a.getSubDirectory(name), b.getSubDirectory(name)), + ); + }; + + it("Rebasing ops maintains eventual consistency", () => { + dir2.on("valueChanged", (changed) => { + if (changed.key === "key") { + dir2.set("valueChanged", "valueChanged"); + } + }); + dir2.on("subDirectoryCreated", () => { + dir2.set("subDirectoryCreated1", "subDirectoryCreated"); + dir2.set("subDirectoryCreated2", "subDirectoryCreated"); + containerRuntime2.rebase(); + }); + const root1SubDir = dir1.createSubDirectory("testSubDir"); + dir2.createSubDirectory("testSubDir"); + + containerRuntime1.flush(); + containerRuntime2.flush(); + + root1SubDir.set("key1", "testValue1"); + dir1.set("key", "value"); + containerRuntime1.flush(); + containerRuntimeFactory.processAllMessages(); + + dir2.deleteSubDirectory("testSubDir"); + dir2.createSubDirectory("testSubDir"); + + containerRuntime2.rebase(); + containerRuntime2.flush(); + containerRuntimeFactory.processAllMessages(); + + const directory1SubDir = dir1.getSubDirectory("testSubDir"); + const directory2SubDir = dir2.getSubDirectory("testSubDir"); + + assert(directory1SubDir !== undefined, "SubDirectory on dir 1 should be present"); + assert(directory2SubDir !== undefined, "SubDirectory on dir 2 should be present"); + + assert.strictEqual(directory1SubDir.size, 0, "Dir 1 no key should exist"); + assert.strictEqual(directory2SubDir.size, 0, "Dir 2 no key should exist"); + areDirectoriesEqual(dir1, dir2); + }); + }); +}); diff --git a/packages/dds/sequence/src/test/rebasing.spec.ts b/packages/dds/sequence/src/test/rebasing.spec.ts new file mode 100644 index 000000000000..a5c5378a11f8 --- /dev/null +++ b/packages/dds/sequence/src/test/rebasing.spec.ts @@ -0,0 +1,91 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; +import { + MockFluidDataStoreRuntime, + MockContainerRuntimeFactoryForRebasing, + MockContainerRuntimeForRebasing, + MockStorage, +} from "@fluidframework/test-runtime-utils"; +import { IMergeTreeInsertMsg } from "@fluidframework/merge-tree"; +import { SharedString } from "../sharedString"; +import { SharedStringFactory } from "../sequenceFactory"; + +describe("Rebasing", () => { + let containerRuntimeFactory: MockContainerRuntimeFactoryForRebasing; + let containerRuntime1: MockContainerRuntimeForRebasing; + let containerRuntime2: MockContainerRuntimeForRebasing; + let sharedString1: SharedString; + let sharedString2: SharedString; + + const createSharedString = async ( + id: string, + factory: MockContainerRuntimeFactoryForRebasing, + ): Promise<[SharedString, MockContainerRuntimeForRebasing]> => { + const dataStoreRuntime = new MockFluidDataStoreRuntime(); + dataStoreRuntime.local = false; + const containerRuntime = factory.createContainerRuntime(dataStoreRuntime); + const services = { + deltaConnection: containerRuntime.createDeltaConnection(), + objectStorage: new MockStorage(), + }; + const sharedString = new SharedString(dataStoreRuntime, id, SharedStringFactory.Attributes); + sharedString.initializeLocal(); + sharedString.connect(services); + return [sharedString, containerRuntime]; + }; + + beforeEach(async () => { + containerRuntimeFactory = new MockContainerRuntimeFactoryForRebasing(); + [sharedString1, containerRuntime1] = await createSharedString( + "shared-string-1", + containerRuntimeFactory, + ); + [sharedString2, containerRuntime2] = await createSharedString( + "shared-string-2", + containerRuntimeFactory, + ); + }); + + it("Rebasing ops maintains eventual consistency", async () => { + sharedString1.insertText(0, "ad"); + sharedString1.insertText(1, "c"); + containerRuntime1.flush(); + containerRuntime2.flush(); + containerRuntimeFactory.processAllMessages(); + + sharedString2.on("sequenceDelta", (sequenceDeltaEvent) => { + if ((sequenceDeltaEvent.opArgs.op as IMergeTreeInsertMsg).seg === "b") { + sharedString2.insertText(3, "u"); + containerRuntime2.rebase(); + } + }); + + sharedString1.insertText(1, "b"); + sharedString2.insertText(0, "y"); + sharedString2.insertText(1, "w"); + containerRuntime1.flush(); + containerRuntimeFactory.processOneMessage(); + sharedString2.insertText(2, "v"); + + containerRuntime2.rebase(); + containerRuntime1.flush(); + containerRuntime2.flush(); + containerRuntimeFactory.processAllMessages(); + + sharedString2.insertText(0, "z"); + containerRuntime1.flush(); + containerRuntime2.flush(); + containerRuntimeFactory.processAllMessages(); + + assert.strictEqual(sharedString1.getText(), "zywvaubcd"); + assert.strictEqual( + sharedString1.getText(), + sharedString2.getText(), + "SharedString eventual consistency broken", + ); + }); +}); diff --git a/packages/runtime/test-runtime-utils/src/index.ts b/packages/runtime/test-runtime-utils/src/index.ts index b3b2971a9069..c95709e37603 100644 --- a/packages/runtime/test-runtime-utils/src/index.ts +++ b/packages/runtime/test-runtime-utils/src/index.ts @@ -23,5 +23,9 @@ export { MockContainerRuntimeFactoryForReconnection, MockContainerRuntimeForReconnection, } from "./mocksForReconnection"; +export { + MockContainerRuntimeFactoryForRebasing, + MockContainerRuntimeForRebasing, +} from "./mocksForRebasing"; export { MockStorage } from "./mockStorage"; export { validateAssertionError } from "./validateAssertionError"; diff --git a/packages/runtime/test-runtime-utils/src/mocks.ts b/packages/runtime/test-runtime-utils/src/mocks.ts index a27ce5109303..4f8ec407a213 100644 --- a/packages/runtime/test-runtime-utils/src/mocks.ts +++ b/packages/runtime/test-runtime-utils/src/mocks.ts @@ -124,17 +124,18 @@ export class MockContainerRuntime { // Set FluidDataStoreRuntime's deltaManager to ours so that they are in sync. this.dataStoreRuntime.deltaManager = this.deltaManager; this.dataStoreRuntime.quorum = factory.quorum; + this.dataStoreRuntime.containerRuntime = this; // FluidDataStoreRuntime already creates a clientId, reuse that so they are in sync. this.clientId = this.dataStoreRuntime.clientId ?? uuid(); factory.quorum.addMember(this.clientId, {}); } public createDeltaConnection(): MockDeltaConnection { - const deltaConnection = new MockDeltaConnection( - (messageContent: any, localOpMetadata: unknown) => - this.submit(messageContent, localOpMetadata), - () => this.dirty(), + assert( + this.dataStoreRuntime.createDeltaConnection !== undefined, + "Unsupported datastore runtime version", ); + const deltaConnection = this.dataStoreRuntime.createDeltaConnection(); this.deltaConnections.push(deltaConnection); return deltaConnection; } @@ -145,7 +146,7 @@ export class MockContainerRuntime { clientId: this.clientId, clientSequenceNumber, contents: messageContent, - referenceSequenceNumber: this.deltaManager.lastSequenceNumber, + referenceSequenceNumber: this.referenceSequenceNumber, type: MessageType.Operation, }; this.factory.pushMessage(msg); @@ -162,9 +163,7 @@ export class MockContainerRuntime { this.deltaManager.lastMessage = message; this.deltaManager.minimumSequenceNumber = message.minimumSequenceNumber; const [local, localOpMetadata] = this.processInternal(message); - this.deltaConnections.forEach((dc) => { - dc.process(message, local, localOpMetadata); - }); + this.dataStoreRuntime.process(message, local, localOpMetadata); } protected addPendingMessage( @@ -193,6 +192,10 @@ export class MockContainerRuntime { } return [local, localOpMetadata]; } + + protected get referenceSequenceNumber() { + return this.deltaManager.lastSequenceNumber; + } } /** @@ -432,6 +435,17 @@ export class MockFluidDataStoreRuntime "fluid:MockFluidDataStoreRuntime", ); public quorum = new MockQuorumClients(); + public containerRuntime?: MockContainerRuntime; + private readonly deltaConnections: MockDeltaConnection[] = []; + public createDeltaConnection?(): MockDeltaConnection { + const deltaConnection = new MockDeltaConnection( + (messageContent: any, localOpMetadata: unknown) => + this.submitMessageInternal(messageContent, localOpMetadata), + () => this.setChannelDirty(), + ); + this.deltaConnections.push(deltaConnection); + return deltaConnection; + } public ensureNoDataModelChanges(callback: () => T): T { return callback(); @@ -520,16 +534,34 @@ export class MockFluidDataStoreRuntime return null; } + private submitMessageInternal(messageContent: any, localOpMetadata: unknown): number { + assert( + this.containerRuntime !== undefined, + "The container runtime has not been initialized", + ); + return this.containerRuntime.submit(messageContent, localOpMetadata); + } + public submitMessage(type: MessageType, content: any) { return null; } + private setChannelDirty(): void { + assert( + this.containerRuntime !== undefined, + "The container runtime has not been initialized", + ); + return this.containerRuntime.dirty(); + } + public submitSignal(type: string, content: any) { return null; } - public process(message: ISequencedDocumentMessage, local: boolean): void { - return; + public process(message: ISequencedDocumentMessage, local: boolean, localOpMetadata: unknown) { + this.deltaConnections.forEach((dc) => { + dc.process(message, local, localOpMetadata); + }); } public processSignal(message: any, local: boolean) { @@ -606,12 +638,15 @@ export class MockFluidDataStoreRuntime } public reSubmit(content: any, localOpMetadata: unknown) { - return; + this.deltaConnections.forEach((dc) => { + dc.reSubmit(content, localOpMetadata); + }); } public async applyStashedOp(content: any) { return; } + public rollback?(message: any, localOpMetadata: unknown): void { return; } diff --git a/packages/runtime/test-runtime-utils/src/mocksForRebasing.ts b/packages/runtime/test-runtime-utils/src/mocksForRebasing.ts new file mode 100644 index 000000000000..4fb556745234 --- /dev/null +++ b/packages/runtime/test-runtime-utils/src/mocksForRebasing.ts @@ -0,0 +1,133 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { ISequencedDocumentMessage, MessageType } from "@fluidframework/protocol-definitions"; +import { + MockContainerRuntime, + MockContainerRuntimeFactory, + MockFluidDataStoreRuntime, +} from "./mocks"; + +/** + * Specialized implementation of MockContainerRuntime for testing op rebasing, when the runtime will resend + * ops to the datastores and all ops within the same batch will have the same sequence number. + */ +export class MockContainerRuntimeForRebasing extends MockContainerRuntime { + private readonly outbox: InternalMessage[] = []; + + constructor( + dataStoreRuntime: MockFluidDataStoreRuntime, + factory: MockContainerRuntimeFactoryForRebasing, + overrides?: { minimumSequenceNumber?: number }, + ) { + super(dataStoreRuntime, factory, overrides); + } + + public process(message: ISequencedDocumentMessage) { + super.process(message); + + // We've processed something, therefore the current batch has ended + this.clientSequenceNumber++; + } + + public submit(messageContent: any, localOpMetadata: unknown) { + this.outbox.push({ + content: messageContent, + localOpMetadata, + }); + + // Messages in the same batch will have the same clientSequenceNumber + return this.clientSequenceNumber; + } + + public flush() { + while (this.outbox.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.submitInternal(this.outbox.shift()!); + } + } + + private submitInternal(message: InternalMessage) { + this.factory.pushMessage({ + clientId: this.clientId, + clientSequenceNumber: this.clientSequenceNumber, + contents: message.content, + referenceSequenceNumber: this.referenceSequenceNumber, + type: MessageType.Operation, + }); + this.addPendingMessage(message.content, message.localOpMetadata, this.clientSequenceNumber); + } + + public rebase() { + const messagesToRebase = this.outbox.slice(); + this.outbox.length = 0; + + messagesToRebase.forEach((message) => + this.dataStoreRuntime.reSubmit(message.content, message.localOpMetadata), + ); + } +} + +interface InternalMessage { + content: any; + localOpMetadata: unknown; +} + +/** + * Specialized implementation of MockContainerRuntimeFactory for testing op rebasing. + */ +export class MockContainerRuntimeFactoryForRebasing extends MockContainerRuntimeFactory { + public createContainerRuntime( + dataStoreRuntime: MockFluidDataStoreRuntime, + overrides?: { minimumSequenceNumber?: number }, + ): MockContainerRuntimeForRebasing { + const containerRuntime = new MockContainerRuntimeForRebasing( + dataStoreRuntime, + this, + overrides, + ); + this.runtimes.push(containerRuntime); + return containerRuntime; + } + + private processMessage() { + if (this.messages.length === 0) { + throw new Error("Tried to process a message that did not exist"); + } + + // Explicitly JSON clone the value to match the behavior of going thru the wire. + const message = JSON.parse( + JSON.stringify(this.messages.shift()), + ) as ISequencedDocumentMessage; + + this.minSeq.set(message.clientId, message.referenceSequenceNumber); + // Messages from the same batch have the same sequence number + message.sequenceNumber = this.sequenceNumber; + message.minimumSequenceNumber = this.getMinSeq(); + for (const runtime of this.runtimes) { + runtime.process(message); + } + } + + public processOneMessage() { + // Increase the sequence number between batches + this.sequenceNumber++; + this.processMessage(); + } + + public processSomeMessages(count: number) { + // Increase the sequence number between batches + this.sequenceNumber++; + for (let i = 0; i < count; i++) { + this.processMessage(); + } + } + + public processAllMessages() { + // Increase the sequence number between batches + this.sequenceNumber++; + this.processSomeMessages(this.messages.length); + } +} diff --git a/packages/runtime/test-runtime-utils/src/mocksForReconnection.ts b/packages/runtime/test-runtime-utils/src/mocksForReconnection.ts index 2982c73e069e..e60d4bdda47b 100644 --- a/packages/runtime/test-runtime-utils/src/mocksForReconnection.ts +++ b/packages/runtime/test-runtime-utils/src/mocksForReconnection.ts @@ -14,7 +14,7 @@ import { } from "./mocks"; /** - * Specalized implementation of MockContainerRuntime for testing ops during reconnection. + * Specialized implementation of MockContainerRuntime for testing ops during reconnection. */ export class MockContainerRuntimeForReconnection extends MockContainerRuntime { /** @@ -96,16 +96,14 @@ export class MockContainerRuntimeForReconnection extends MockContainerRuntime { pendingMessage !== undefined, "this is impossible due to the above length check", ); - this.deltaConnections.forEach((dc) => { - dc.reSubmit(pendingMessage.content, pendingMessage.localOpMetadata); - }); + this.dataStoreRuntime.reSubmit(pendingMessage.content, pendingMessage.localOpMetadata); messageCount--; } } } /** - * Specalized implementation of MockContainerRuntimeFactory for testing ops during reconnection. + * Specialized implementation of MockContainerRuntimeFactory for testing ops during reconnection. */ export class MockContainerRuntimeFactoryForReconnection extends MockContainerRuntimeFactory { public createContainerRuntime(