-
Notifications
You must be signed in to change notification settings - Fork 532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support rebasing ops for testing. Part 1, the mock. #16163
Changes from 17 commits
221137f
c24f3e8
e31d309
c62d1d4
748631d
309fb58
8e8bf82
b3541be
52f6329
f7ec14e
140ce54
9525c37
a349de8
3eb38b3
0f6f330
c22db7c
2595ef9
2345a19
e7f183b
b381f26
0bb980a
59984e5
b6b7c2a
dbff9a9
81a64a1
5031292
5579681
8544f04
0a04870
c6be0d9
35586b0
14b3fd8
980360d
90922d9
7b5dde6
12795b1
155f56d
9dee177
a69c719
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,6 +115,8 @@ export class MockContainerRuntime { | |
// (undocumented) | ||
process(message: ISequencedDocumentMessage): void; | ||
// (undocumented) | ||
protected get referenceSequenceNumber(): number; | ||
// (undocumented) | ||
submit(messageContent: any, localOpMetadata: unknown): number; | ||
} | ||
|
||
|
@@ -142,6 +144,20 @@ export class MockContainerRuntimeFactory { | |
sequenceNumber: number; | ||
} | ||
|
||
// @public | ||
export class MockContainerRuntimeFactoryForRebasing extends MockContainerRuntimeFactory { | ||
// (undocumented) | ||
createContainerRuntime(dataStoreRuntime: MockFluidDataStoreRuntime, overrides?: { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just create() is good :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From |
||
minimumSequenceNumber?: number; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While I can see why you might want to create shape like that for last arg, I'd rather switch to this form when it's needed. Optional inside of optional is not great design pattern :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is all existing code and I agree it can be refactored into a better shape. However, I don't believe this change should address that. |
||
}): MockContainerRuntimeForRebasing; | ||
// (undocumented) | ||
processAllMessages(): void; | ||
// (undocumented) | ||
processOneMessage(): void; | ||
// (undocumented) | ||
processSomeMessages(count: number): void; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you merge these 3 APIs into one? with count being optional (meaning all)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's outside the scope of this change. These methods are coming from |
||
} | ||
|
||
// @public | ||
export class MockContainerRuntimeFactoryForReconnection extends MockContainerRuntimeFactory { | ||
// (undocumented) | ||
|
@@ -152,6 +168,19 @@ export class MockContainerRuntimeFactoryForReconnection extends MockContainerRun | |
}): MockContainerRuntimeForReconnection; | ||
} | ||
|
||
// @public | ||
export class MockContainerRuntimeForRebasing extends MockContainerRuntime { | ||
constructor(dataStoreRuntime: MockFluidDataStoreRuntime, factory: MockContainerRuntimeFactoryForRebasing, overrides?: { | ||
minimumSequenceNumber?: number; | ||
}); | ||
// (undocumented) | ||
process(message: ISequencedDocumentMessage): void; | ||
// (undocumented) | ||
rebase(): void; | ||
// (undocumented) | ||
submit(messageContent: any, localOpMetadata: unknown): number; | ||
} | ||
|
||
// @public | ||
export class MockContainerRuntimeForReconnection extends MockContainerRuntime { | ||
constructor(dataStoreRuntime: MockFluidDataStoreRuntime, factory: MockContainerRuntimeFactoryForReconnection, overrides?: { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/*! | ||
* Copyright (c) Microsoft Corporation and contributors. All rights reserved. | ||
* Licensed under the MIT License. | ||
*/ | ||
|
||
import { strict as assert } from "assert"; | ||
import { | ||
MockFluidDataStoreRuntime, | ||
MockContainerRuntimeFactoryForRebasing, | ||
MockContainerRuntimeForRebasing, | ||
MockStorage, | ||
} from "@fluidframework/test-runtime-utils"; | ||
import { MapFactory, AttributableMap } from "../../map"; | ||
|
||
describe("Rebasing", () => { | ||
let containerRuntimeFactory: MockContainerRuntimeFactoryForRebasing; | ||
let containerRuntime1: MockContainerRuntimeForRebasing; | ||
let containerRuntime2: MockContainerRuntimeForRebasing; | ||
let map1: AttributableMap; | ||
let map2: AttributableMap; | ||
|
||
beforeEach(async () => { | ||
containerRuntimeFactory = new MockContainerRuntimeFactoryForRebasing(); | ||
|
||
// Create the first SharedMap. | ||
const dataStoreRuntime1 = new MockFluidDataStoreRuntime(); | ||
containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1); | ||
const services1 = { | ||
deltaConnection: containerRuntime1.createDeltaConnection(), | ||
objectStorage: new MockStorage(), | ||
}; | ||
map1 = new AttributableMap("shared-map-1", dataStoreRuntime1, MapFactory.Attributes); | ||
map1.connect(services1); | ||
|
||
// Create the second SharedMap. | ||
const dataStoreRuntime2 = new MockFluidDataStoreRuntime(); | ||
containerRuntime2 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime2); | ||
const services2 = { | ||
deltaConnection: containerRuntime2.createDeltaConnection(), | ||
objectStorage: new MockStorage(), | ||
}; | ||
map2 = new AttributableMap("shared-map-2", dataStoreRuntime2, MapFactory.Attributes); | ||
map2.connect(services2); | ||
}); | ||
|
||
it("Rebasing ops maintains eventual consistency", async () => { | ||
const keyCount = 10; | ||
for (let i = 0; i < keyCount; i++) { | ||
map1.set(`${i}`, map1.size); | ||
} | ||
|
||
containerRuntime1.rebase(); | ||
containerRuntimeFactory.processAllMessages(); | ||
|
||
for (let i = 0; i < keyCount; i++) { | ||
assert.strictEqual(map1.get(`${i}`), i); | ||
assert.strictEqual(map2.get(`${i}`), i); | ||
} | ||
|
||
const deleteThreshold = 5; | ||
for (let i = 0; i < deleteThreshold - 1; i++) { | ||
map2.delete(`${i}`); | ||
} | ||
|
||
map1.delete(`${deleteThreshold - 1}`); | ||
|
||
containerRuntime2.rebase(); | ||
containerRuntimeFactory.processAllMessages(); | ||
|
||
for (let i = 0; i < 10; i++) { | ||
const expected = i < deleteThreshold ? undefined : i; | ||
assert.strictEqual(map1.get(`${i}`), expected); | ||
assert.strictEqual(map2.get(`${i}`), expected); | ||
} | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
/*! | ||
* Copyright (c) Microsoft Corporation and contributors. All rights reserved. | ||
* Licensed under the MIT License. | ||
*/ | ||
|
||
import { v4 as uuid } from "uuid"; | ||
import { ISequencedDocumentMessage, MessageType } from "@fluidframework/protocol-definitions"; | ||
import { | ||
MockContainerRuntime, | ||
MockContainerRuntimeFactory, | ||
MockFluidDataStoreRuntime, | ||
} from "./mocks"; | ||
|
||
/** | ||
* Specialized implementation of MockContainerRuntime for testing op rebasing, when the runtime will resend | ||
* ops to the datastores and all ops within the same batch will have the same sequence number. | ||
*/ | ||
export class MockContainerRuntimeForRebasing extends MockContainerRuntime { | ||
private readonly currentBatch: ITrackableMessage[] = []; | ||
|
||
constructor( | ||
dataStoreRuntime: MockFluidDataStoreRuntime, | ||
factory: MockContainerRuntimeFactoryForRebasing, | ||
overrides?: { minimumSequenceNumber?: number }, | ||
) { | ||
super(dataStoreRuntime, factory, overrides); | ||
} | ||
|
||
public process(message: ISequencedDocumentMessage) { | ||
// Processing ops will happen in a separate JS turn, so by then, we'd increase | ||
// the sequence number and flush the current batch. | ||
this.clientSequenceNumber++; | ||
this.currentBatch.splice(0); | ||
|
||
super.process(message); | ||
andre4i marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
public submit(messageContent: any, localOpMetadata: unknown) { | ||
const message = { | ||
content: messageContent, | ||
localOpMetadata, | ||
opId: uuid(), | ||
timesSubmitted: 0, | ||
}; | ||
this.submitInternal(message); | ||
this.currentBatch.push(message); | ||
|
||
// Messages in the same batch will have the same clientSequenceNumber | ||
return this.clientSequenceNumber; | ||
andre4i marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
private submitInternal(message: ITrackableMessage) { | ||
message.timesSubmitted++; | ||
|
||
const metadata = { opId: message.opId, timesSubmitted: message.timesSubmitted }; | ||
this.factory.pushMessage({ | ||
clientId: this.clientId, | ||
clientSequenceNumber: this.clientSequenceNumber, | ||
contents: message.content, | ||
referenceSequenceNumber: this.referenceSequenceNumber, | ||
type: MessageType.Operation, | ||
metadata, | ||
}); | ||
this.addPendingMessage( | ||
message.content, | ||
{ ...(message.localOpMetadata as object), ...metadata }, | ||
this.clientSequenceNumber, | ||
); | ||
} | ||
|
||
public rebase() { | ||
this.currentBatch.forEach((message) => this.submitInternal(message)); | ||
andre4i marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
/** | ||
* To help debugging eventual consistency tests, all ops produced by this mock | ||
* can be tracked using an unique id. The tracking information will also be included | ||
* in the message metadata and local op metadata. | ||
*/ | ||
interface ITrackableMessage { | ||
/** | ||
* Message content | ||
*/ | ||
content: any; | ||
/** | ||
* local op metadata | ||
*/ | ||
localOpMetadata: unknown; | ||
/** | ||
* Unique identifier | ||
*/ | ||
opId: string; | ||
/** | ||
* How many times has this op been resubmitted | ||
*/ | ||
timesSubmitted: number; | ||
} | ||
|
||
/** | ||
* Specialized implementation of MockContainerRuntimeFactory for testing op rebasing. | ||
*/ | ||
export class MockContainerRuntimeFactoryForRebasing extends MockContainerRuntimeFactory { | ||
public createContainerRuntime( | ||
dataStoreRuntime: MockFluidDataStoreRuntime, | ||
overrides?: { minimumSequenceNumber?: number }, | ||
): MockContainerRuntimeForRebasing { | ||
const containerRuntime = new MockContainerRuntimeForRebasing( | ||
dataStoreRuntime, | ||
this, | ||
overrides, | ||
); | ||
this.runtimes.push(containerRuntime); | ||
return containerRuntime; | ||
} | ||
|
||
private processMessage() { | ||
if (this.messages.length === 0) { | ||
throw new Error("Tried to process a message that did not exist"); | ||
} | ||
|
||
// Explicitly JSON clone the value to match the behavior of going thru the wire. | ||
const message = JSON.parse( | ||
JSON.stringify(this.messages.shift()), | ||
) as ISequencedDocumentMessage; | ||
|
||
this.minSeq.set(message.clientId, message.referenceSequenceNumber); | ||
// Messages from the same batch have the same sequence number | ||
message.sequenceNumber = this.sequenceNumber; | ||
message.minimumSequenceNumber = this.getMinSeq(); | ||
for (const runtime of this.runtimes) { | ||
runtime.process(message); | ||
} | ||
} | ||
|
||
public processOneMessage() { | ||
// Increase the sequence number between batches | ||
this.sequenceNumber++; | ||
this.processMessage(); | ||
} | ||
|
||
public processSomeMessages(count: number) { | ||
// Increase the sequence number between batches | ||
this.sequenceNumber++; | ||
for (let i = 0; i < count; i++) { | ||
this.processMessage(); | ||
} | ||
} | ||
|
||
public processAllMessages() { | ||
// Increase the sequence number between batches | ||
this.sequenceNumber++; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not clear why this.sequenceNumber is incremented differently if I call this API vs. call processOneMessage() multiple times There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. process* 'splits' the batch, so it would be equivalent to to a JS turn |
||
this.processSomeMessages(this.messages.length); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd make this mock either assert or work correctly in presence of re-entrancy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand. That's not the intention here, the mock shouldn't assert, that's the responsibility of the test using it. I think there's a misunderstanding here about how these mocks work. These mocks are of a more manual nature as opposed to automatically detecting if there's reentry. So, we can simulate what happens at reentry but we won't do it automatically. That would imply we're reimplementing the runtime logic and if we're doing so, we might as well have the tests run with the real container runtime. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is wrong, we should evacuate the whole queue at this point (even if the queue grows while it's being evacuated). |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no idea if we have a place for it, but it would be nice to not publicly export this, as it increases our compat burden. this is especially bad as consumers do use this package, but it is not well factored/designed.