Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the runtime mocks to support batches and rebasing #16460

Merged
merged 16 commits into from
Jul 25, 2023
23 changes: 20 additions & 3 deletions api-report/test-runtime-utils.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { CreateChildSummarizerNodeFn } from '@fluidframework/runtime-definitions
import { CreateChildSummarizerNodeParam } from '@fluidframework/runtime-definitions';
import { EventEmitter } from 'events';
import { FluidObject } from '@fluidframework/core-interfaces';
import { FlushMode } from '@fluidframework/runtime-definitions';
import { IAudience } from '@fluidframework/container-definitions';
import { IChannel } from '@fluidframework/datastore-definitions';
import { IChannelServices } from '@fluidframework/datastore-definitions';
Expand Down Expand Up @@ -63,6 +64,12 @@ export interface IInsecureUser extends IUser {
name: string;
}

// @public
export interface IMockContainerRuntimeOptions {
readonly enableGroupedBatching?: boolean;
readonly flushMode?: FlushMode;
}

// @public (undocumented)
export interface IMockContainerRuntimePendingMessage {
// (undocumented)
Expand All @@ -87,7 +94,7 @@ export class InsecureTokenProvider implements ITokenProvider {

// @public
export class MockContainerRuntime {
constructor(dataStoreRuntime: MockFluidDataStoreRuntime, factory: MockContainerRuntimeFactory, overrides?: {
constructor(dataStoreRuntime: MockFluidDataStoreRuntime, factory: MockContainerRuntimeFactory, mockContainerRuntimeOptions?: IMockContainerRuntimeOptions, overrides?: {
minimumSequenceNumber?: number | undefined;
} | undefined);
// (undocumented)
Expand All @@ -106,6 +113,7 @@ export class MockContainerRuntime {
dirty(): void;
// (undocumented)
protected readonly factory: MockContainerRuntimeFactory;
flush?(): void;
// (undocumented)
protected readonly overrides?: {
minimumSequenceNumber?: number | undefined;
Expand All @@ -114,12 +122,16 @@ export class MockContainerRuntime {
protected readonly pendingMessages: IMockContainerRuntimePendingMessage[];
// (undocumented)
process(message: ISequencedDocumentMessage): void;
rebase?(): void;
protected get referenceSequenceNumber(): number;
protected runtimeOptions: Required<IMockContainerRuntimeOptions>;
// (undocumented)
submit(messageContent: any, localOpMetadata: unknown): number;
}

// @public
export class MockContainerRuntimeFactory {
constructor(mockContainerRuntimeOptions?: IMockContainerRuntimeOptions);
// (undocumented)
createContainerRuntime(dataStoreRuntime: MockFluidDataStoreRuntime): MockContainerRuntime;
// (undocumented)
Expand All @@ -136,6 +148,7 @@ export class MockContainerRuntimeFactory {
pushMessage(msg: Partial<ISequencedDocumentMessage>): void;
// (undocumented)
readonly quorum: MockQuorumClients;
protected readonly runtimeOptions: Required<IMockContainerRuntimeOptions>;
// (undocumented)
protected readonly runtimes: MockContainerRuntime[];
// (undocumented)
Expand All @@ -154,7 +167,7 @@ export class MockContainerRuntimeFactoryForReconnection extends MockContainerRun

// @public
export class MockContainerRuntimeForReconnection extends MockContainerRuntime {
constructor(dataStoreRuntime: MockFluidDataStoreRuntime, factory: MockContainerRuntimeFactoryForReconnection, overrides?: {
constructor(dataStoreRuntime: MockFluidDataStoreRuntime, factory: MockContainerRuntimeFactoryForReconnection, runtimeOptions?: IMockContainerRuntimeOptions, overrides?: {
minimumSequenceNumber?: number;
});
// (undocumented)
Expand Down Expand Up @@ -391,8 +404,12 @@ export class MockFluidDataStoreRuntime extends EventEmitter implements IFluidDat
// (undocumented)
readonly connected = true;
// (undocumented)
containerRuntime?: MockContainerRuntime;
// (undocumented)
createChannel(id: string, type: string): IChannel;
// (undocumented)
createDeltaConnection?(): MockDeltaConnection;
// (undocumented)
deltaManager: MockDeltaManager;
// (undocumented)
dispose(): void;
Expand Down Expand Up @@ -444,7 +461,7 @@ export class MockFluidDataStoreRuntime extends EventEmitter implements IFluidDat
// (undocumented)
readonly path = "";
// (undocumented)
process(message: ISequencedDocumentMessage, local: boolean): void;
process(message: ISequencedDocumentMessage, local: boolean, localOpMetadata: unknown): void;
// (undocumented)
processSignal(message: any, local: boolean): void;
// (undocumented)
Expand Down
217 changes: 217 additions & 0 deletions packages/dds/map/src/test/mocha/rebasing.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { strict as assert } from "assert";
import {
MockFluidDataStoreRuntime,
MockContainerRuntimeFactory,
MockContainerRuntime,
MockStorage,
} from "@fluidframework/test-runtime-utils";
import { FlushMode } from "@fluidframework/runtime-definitions";
import { MapFactory, SharedMap } from "../../map";
import { DirectoryFactory, SharedDirectory } from "../../directory";
import { IDirectory } from "../../interfaces";

describe("Rebasing", () => {
let containerRuntimeFactory: MockContainerRuntimeFactory;
let containerRuntime1: MockContainerRuntime;
let containerRuntime2: MockContainerRuntime;

[
{
options: {
flushMode: FlushMode.Immediate,
},
name: "FlushMode immediate",
},
{
options: {
flushMode: FlushMode.TurnBased,
enableGroupedBatching: true,
},
name: "FlushMode TurnBased with grouped batching",
},
].forEach((testConfig) => {
describe(`SharedMap - ${testConfig.name}`, () => {
let map1: SharedMap;
let map2: SharedMap;

beforeEach(async () => {
containerRuntimeFactory = new MockContainerRuntimeFactory(testConfig.options);
const dataStoreRuntime1 = new MockFluidDataStoreRuntime();
containerRuntime1 =
containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1);
const services1 = {
deltaConnection: containerRuntime1.createDeltaConnection(),
objectStorage: new MockStorage(),
};
map1 = new SharedMap("shared-map-1", dataStoreRuntime1, MapFactory.Attributes);
map1.connect(services1);

const dataStoreRuntime2 = new MockFluidDataStoreRuntime();
containerRuntime2 =
containerRuntimeFactory.createContainerRuntime(dataStoreRuntime2);
const services2 = {
deltaConnection: containerRuntime2.createDeltaConnection(),
objectStorage: new MockStorage(),
};
map2 = new SharedMap("shared-map-2", dataStoreRuntime2, MapFactory.Attributes);
map2.connect(services2);
});

it("Rebasing ops maintains eventual consistency", () => {
assert(containerRuntime1.rebase !== undefined, "Unsupported test-utils version");
assert(containerRuntime1.flush !== undefined, "Unsupported test-utils version");
assert(containerRuntime2.rebase !== undefined, "Unsupported test-utils version");
assert(containerRuntime2.flush !== undefined, "Unsupported test-utils version");

const keyCount = 10;
for (let i = 0; i < keyCount; i++) {
map1.set(`${i}`, map1.size);
}

containerRuntime1.rebase();
containerRuntime1.flush();
containerRuntimeFactory.processAllMessages();

for (let i = 0; i < keyCount; i++) {
assert.strictEqual(map1.get(`${i}`), i);
assert.strictEqual(map2.get(`${i}`), i);
}

const deleteThreshold = 5;
for (let i = 0; i < deleteThreshold - 1; i++) {
map2.delete(`${i}`);
}

map1.delete(`${deleteThreshold - 1}`);

containerRuntime2.rebase();
containerRuntime1.flush();
containerRuntime2.flush();
containerRuntimeFactory.processAllMessages();

for (let i = 0; i < 10; i++) {
const expected = i < deleteThreshold ? undefined : i;
assert.strictEqual(map1.get(`${i}`), expected);
assert.strictEqual(map2.get(`${i}`), expected);
}
});
});

describe(`SharedDirectory - ${testConfig.name}`, () => {
let dir1: SharedDirectory;
let dir2: SharedDirectory;

beforeEach(async () => {
containerRuntimeFactory = new MockContainerRuntimeFactory(testConfig.options);
const dataStoreRuntime1 = new MockFluidDataStoreRuntime();
containerRuntime1 =
containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1);
const services1 = {
deltaConnection: containerRuntime1.createDeltaConnection(),
objectStorage: new MockStorage(),
};
dir1 = new SharedDirectory(
"shared-directory-1",
dataStoreRuntime1,
DirectoryFactory.Attributes,
);
dir1.connect(services1);

// Create the second SharedMap.
const dataStoreRuntime2 = new MockFluidDataStoreRuntime();
containerRuntime2 =
containerRuntimeFactory.createContainerRuntime(dataStoreRuntime2);
const services2 = {
deltaConnection: containerRuntime2.createDeltaConnection(),
objectStorage: new MockStorage(),
};
dir2 = new SharedDirectory(
"shared-directory-2",
dataStoreRuntime2,
DirectoryFactory.Attributes,
);
dir2.connect(services2);
});

const areDirectoriesEqual = (a: IDirectory | undefined, b: IDirectory | undefined) => {
if (a === undefined || b === undefined) {
assert.strictEqual(a, b, "Both directories should be undefined");
return;
}

const leftKeys = Array.from(a.keys());
const rightKeys = Array.from(b.keys());
assert.strictEqual(
leftKeys.length,
rightKeys.length,
"Number of keys should be the same",
);
leftKeys.forEach((key) => {
assert.strictEqual(a.get(key), b.get(key), "Key values should be the same");
});

const leftSubdirectories = Array.from(a.subdirectories());
const rightSubdirectories = Array.from(b.subdirectories());
assert.strictEqual(
leftSubdirectories.length,
rightSubdirectories.length,
"Number of subdirectories should be the same",
);

leftSubdirectories.forEach(([name]) =>
areDirectoriesEqual(a.getSubDirectory(name), b.getSubDirectory(name)),
);
};

it("Rebasing ops maintains eventual consistency", () => {
assert(containerRuntime1.rebase !== undefined, "Unsupported test-utils version");
assert(containerRuntime1.flush !== undefined, "Unsupported test-utils version");
assert(containerRuntime2.rebase !== undefined, "Unsupported test-utils version");
assert(containerRuntime2.flush !== undefined, "Unsupported test-utils version");

dir2.on("valueChanged", (changed) => {
if (changed.key === "key") {
dir2.set("valueChanged", "valueChanged");
}
});
dir2.on("subDirectoryCreated", () => {
dir2.set("subDirectoryCreated1", "subDirectoryCreated");
dir2.set("subDirectoryCreated2", "subDirectoryCreated");
});
const root1SubDir = dir1.createSubDirectory("testSubDir");
dir2.createSubDirectory("testSubDir");

containerRuntime1.flush();
containerRuntime2.rebase();
containerRuntime2.flush();

root1SubDir.set("key1", "testValue1");
dir1.set("key", "value");
containerRuntime1.flush();
containerRuntimeFactory.processAllMessages();

dir2.deleteSubDirectory("testSubDir");
dir2.createSubDirectory("testSubDir");

containerRuntime2.rebase();
containerRuntime2.flush();
containerRuntimeFactory.processAllMessages();

const directory1SubDir = dir1.getSubDirectory("testSubDir");
const directory2SubDir = dir2.getSubDirectory("testSubDir");

assert(directory1SubDir !== undefined, "SubDirectory on dir 1 should be present");
assert(directory2SubDir !== undefined, "SubDirectory on dir 2 should be present");

assert.strictEqual(directory1SubDir.size, 0, "Dir 1 no key should exist");
assert.strictEqual(directory2SubDir.size, 0, "Dir 2 no key should exist");
areDirectoriesEqual(dir1, dir2);
});
});
});
});
Loading