Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write new pending message format in PendingStateManager #16136

Merged
merged 3 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { ContainerMessageType } from "./containerRuntime";
import { pkgVersion } from "./packageVersion";

/**
* ! TODO: Remove this interface in "2.0.0-internal.7.0.0" once we only read IPendingMessageNew
* ! TODO: Remove this interface in "2.0.0-internal.7.0.0" once we only read IPendingMessageNew (AB#4763)
*/
export interface IPendingMessageOld {
type: "message";
Expand All @@ -39,7 +39,7 @@ export interface IPendingMessageNew {
}

/**
* ! TODO: Remove this type in "2.0.0-internal.7.0.0"
* ! TODO: Remove this type in "2.0.0-internal.7.0.0" (AB#4763)
*/
export type IPendingState = IPendingMessageOld | IPendingMessageNew;

Expand Down Expand Up @@ -111,21 +111,18 @@ export class PendingStateManager implements IDisposable {
if (!this.pendingMessages.isEmpty()) {
return {
pendingStates: this.pendingMessages.toArray().map((message) => {
// ! TODO: Remove conversion to IPendingMessageOld in "2.0.0-internal.6.0.0" AB#3826
const content = JSON.parse(message.content);
let content = message.content;
const parsedContent = JSON.parse(content);
// IdAllocations need their localOpMetadata stashed in the contents
// of the op to correctly resume the session when processing stashed ops
if (content.type === ContainerMessageType.IdAllocation) {
content.contents.stashedState = message.localOpMetadata;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justus-camp Is it possible to have this stashedState be part of the content when submitting the message (i.e. in maybeSubmitIdAllocationOp)?
I want to avoid parse then serialize as much as possible just to check the type add another prop to the content string

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used localOpMetadata because we don't want to submit the stashedState over the wire but need it to rehydrate the IdCompressor for stashed ops. My initial prototype actually did what you're suggesting and deleted stashedState from the op before submission, but there's a check in PendingStateManager::processPendingLocalMessage that makes doing it this way difficult and causes a DataProcessingError. I suppose we could ignore IdAllocation ops in that check but that's pretty unfortunate.

It's worth noting that overall this stashed op implementation isn't great and we kind of "retro-fitted" the original compressor to do it this way.

Copy link
Contributor Author

@kian-thompson kian-thompson Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move the serialized state to a summary blob? I think it should, but I may be missing something. Regardless, I'll re-add the logic in the PendingStateManager for back/forward compat, but ideally the PSM wouldn't know about specific op types (can be fixed later on).

if (parsedContent.type === ContainerMessageType.IdAllocation) {
parsedContent.contents.stashedState = message.localOpMetadata;
content = JSON.stringify(parsedContent);
}
return {
...message,
messageType: content.type,
content: content.contents,
// delete localOpMetadata since it may not be serializable
// and will be regenerated by applyStashedOp()
localOpMetadata: undefined,
};

// delete localOpMetadata since it may not be serializable
// and will be regenerated by applyStashedOp()
return { ...message, content, localOpMetadata: undefined };
}),
};
}
Expand All @@ -137,7 +134,7 @@ export class PendingStateManager implements IDisposable {
) {
/**
* Convert old local state format to the new format (IPendingMessageOld to IPendingMessageNew)
* ! TODO: Remove this conversion in "2.0.0-internal.7.0.0"
* ! TODO: Remove this conversion in "2.0.0-internal.7.0.0" (AB#4763)
*/
if (initialLocalState?.pendingStates) {
for (const initialState of initialLocalState.pendingStates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {
ContainerRuntime,
IContainerRuntimeOptions,
} from "../containerRuntime";
import { IPendingMessageOld, PendingStateManager } from "../pendingStateManager";
import { IPendingMessageNew, PendingStateManager } from "../pendingStateManager";
import { DataStores } from "../dataStores";

describe("Runtime", () => {
Expand Down Expand Up @@ -1138,7 +1138,8 @@ describe("Runtime", () => {
assert.notStrictEqual(state, undefined, "expect pending local state");
assert.strictEqual(state?.pendingStates.length, 1, "expect 1 pending message");
assert.deepStrictEqual(
(state?.pendingStates[0] as IPendingMessageOld).content.contents,
JSON.parse((state?.pendingStates[0] as IPendingMessageNew).content).contents
.contents,
{
prop1: 1,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ describe("Pending State Manager", () => {
}

describe("Constructor conversion", () => {
// TODO: Remove in 2.0.0-internal.7.0.0 once only new format is read in constructor
// TODO: Remove in 2.0.0-internal.7.0.0 once only new format is read in constructor (AB#4763)
describe("deserialized content", () => {
it("Empty local state", () => {
{
Expand Down Expand Up @@ -367,90 +367,8 @@ describe("Pending State Manager", () => {
});
});

it("getLocalState writes new flush format", async () => {
const pendingStateManager = createPendingStateManager([
{
type: "message",
referenceSequenceNumber: 0,
opMetadata: { batch: true },
content: '{"type":"component"}',
},
{ type: "message", referenceSequenceNumber: 0, content: '{"type":"component"}' },
{
type: "message",
referenceSequenceNumber: 0,
opMetadata: { batch: false },
content: '{"type":"component"}',
},
{
type: "message",
referenceSequenceNumber: 0,
opMetadata: { batch: true },
content: '{"type":"component"}',
},
{
type: "message",
referenceSequenceNumber: 0,
opMetadata: { batch: false },
content: '{"type":"component"}',
},
{ type: "message", referenceSequenceNumber: 0, content: '{"type":"component"}' },
]);

await pendingStateManager.applyStashedOpsAt(0);

assert.deepStrictEqual(pendingStateManager.getLocalState().pendingStates, [
{
type: "message",
referenceSequenceNumber: 0,
localOpMetadata: undefined,
opMetadata: { batch: true },
content: undefined,
messageType: "component",
},
{
type: "message",
referenceSequenceNumber: 0,
localOpMetadata: undefined,
content: undefined,
messageType: "component",
},
{
type: "message",
referenceSequenceNumber: 0,
localOpMetadata: undefined,
opMetadata: { batch: false },
content: undefined,
messageType: "component",
},
{
type: "message",
referenceSequenceNumber: 0,
localOpMetadata: undefined,
opMetadata: { batch: true },
content: undefined,
messageType: "component",
},
{
type: "message",
referenceSequenceNumber: 0,
localOpMetadata: undefined,
opMetadata: { batch: false },
content: undefined,
messageType: "component",
},
{
type: "message",
referenceSequenceNumber: 0,
localOpMetadata: undefined,
content: undefined,
messageType: "component",
},
]);
});

// TODO: change to new format in "2.0.0-internal.6.0.0" (AB#3826)
it("getLocalState writes old message format", async () => {
// TODO: remove when we only read new format in "2.0.0-internal.7.0.0" (AB#4763)
it("getLocalState writes new message format", async () => {
const pendingStateManager = createPendingStateManager([
{ type: "message", messageType: "component" },
{ type: "message", content: '{"type":"component"}' },
Expand All @@ -466,27 +384,25 @@ describe("Pending State Manager", () => {
assert.deepStrictEqual(pendingStateManager.getLocalState().pendingStates, [
{
type: "message",
messageType: "component",
content: undefined,
content: '{"type":"component"}',
localOpMetadata: undefined,
messageType: "component", // This prop is still there, but it is not on the IPendingMessageNew interface
},
{
type: "message",
messageType: "component",
content: undefined,
content: '{"type":"component"}',
localOpMetadata: undefined,
},
{
type: "message",
messageType: "component",
content: { prop1: "value" },
content: '{"type": "component", "contents": {"prop1": "value"}}',
localOpMetadata: undefined,
},
{
type: "message",
messageType: "component",
content: { prop1: "value" },
content: '{"type":"component","contents":{"prop1":"value"}}',
localOpMetadata: undefined,
messageType: "component", // This prop is still there, but it is not on the IPendingMessageNew interface
},
]);
});
Expand Down
Loading