Skip to content

Commit

Permalink
Automation: main-next integrate
Browse files Browse the repository at this point in the history
  • Loading branch information
msfluid-bot committed Dec 12, 2023
2 parents 5d2dce4 + 0041dd3 commit 7fc7f65
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
MoveEffect,
isMoveIn,
isMoveOut,
getMoveIn,
} from "./moveEffectTable";
import {
getInputLength,
Expand Down Expand Up @@ -170,11 +171,24 @@ function composeMarks<TNodeChange>(
moveEffects: MoveEffectTable<TNodeChange>,
revisionMetadata: RevisionMetadataSource,
): Mark<TNodeChange> {
const nodeChange = composeChildChanges(
let nodeChange = composeChildChanges(
baseMark.changes,
newMark.changes === undefined ? undefined : tagChange(newMark.changes, newRev),
composeChild,
);
if (nodeChange !== undefined) {
const baseSource = getMoveIn(baseMark);
if (baseSource !== undefined) {
setModifyAfter(
moveEffects,
getEndpoint(baseSource, undefined),
nodeChange,
newRev,
composeChild,
);
nodeChange = undefined;
}
}
if (isImpactfulCellRename(newMark, newRev, revisionMetadata)) {
const newAttachAndDetach = asAttachAndDetach(newMark);
const newDetachRevision = newAttachAndDetach.detach.revision ?? newRev;
Expand Down Expand Up @@ -295,37 +309,14 @@ function composeMarks<TNodeChange>(
} else if (!markHasCellEffect(baseMark)) {
return withRevision(withNodeChange(newMark, nodeChange), newRev);
} else if (!markHasCellEffect(newMark)) {
if (isMoveIn(baseMark) && nodeChange !== undefined) {
setModifyAfter(
moveEffects,
getEndpoint(baseMark, undefined),
nodeChange,
newRev,
composeChild,
);
return baseMark;
}
return withNodeChange(baseMark, nodeChange);
} else if (areInputCellsEmpty(baseMark)) {
assert(isDetach(newMark), 0x71c /* Unexpected mark type */);
assert(isAttach(baseMark), 0x71d /* Expected generative mark */);
let localNodeChange = nodeChange;

const attach = extractMarkEffect(baseMark);
const detach = extractMarkEffect(withRevision(newMark, newRev));

if (isMoveIn(attach) && nodeChange !== undefined) {
setModifyAfter(
moveEffects,
getEndpoint(attach, undefined),
nodeChange,
newRev,
composeChild,
);

localNodeChange = undefined;
}

if (isMoveIn(attach) && isMoveOut(detach)) {
const finalSource = getEndpoint(attach, undefined);
const finalDest = getEndpoint(detach, newRev);
Expand Down Expand Up @@ -354,10 +345,7 @@ function composeMarks<TNodeChange>(

if (areEqualCellIds(getOutputCellId(newMark, newRev, revisionMetadata), baseMark.cellId)) {
// The output and input cell IDs are the same, so this mark has no effect.
return withNodeChange(
{ count: baseMark.count, cellId: baseMark.cellId },
localNodeChange,
);
return withNodeChange({ count: baseMark.count, cellId: baseMark.cellId }, nodeChange);
}
return normalizeCellRename(
{
Expand All @@ -367,7 +355,7 @@ function composeMarks<TNodeChange>(
attach,
detach,
},
localNodeChange,
nodeChange,
);
} else {
if (isMoveMark(baseMark) && isMoveMark(newMark)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ export function isMoveIn(effect: MarkEffect): effect is MoveIn {
return effect.type === "MoveIn";
}

export function getMoveIn(effect: MarkEffect): MoveIn | undefined {
switch (effect.type) {
case "MoveIn":
return effect;
case "AttachAndDetach":
return getMoveIn(effect.attach);
default:
return undefined;
}
}

function adjustMoveEffectBasis<T>(effect: MoveEffectWithBasis<T>, newBasis: MoveId): MoveEffect<T> {
if (effect.basis === newBasis) {
return effect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,23 @@ describe("SequenceField - Compose", () => {
assert.deepEqual(composed, expected);
});

it("move-in+delete ○ modify", () => {
const changes = TestChange.mint([], 42);
const [mo, mi] = Mark.move(1, { revision: tag1, localId: brand(1) });
const attachDetach = Mark.attachAndDetach(
mi,
Mark.delete(1, { revision: tag2, localId: brand(2) }),
);
const base = makeAnonChange([mo, attachDetach]);
const modify = tagChange(
[Mark.modify(changes, { revision: tag2, localId: brand(2) })],
tag3,
);
const actual = shallowCompose([base, modify]);
const expected = [{ ...mo, changes }, attachDetach];
assert.deepEqual(actual, expected);
});

it("effect management for [move, modify, move]", () => {
const changes = TestChange.mint([], 42);
const [mo, mi] = Mark.move(1, brand(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ describe("Fuzz - Top-Level", () => {
saveFailures: {
directory: failureDirectory,
},
skip: [42],
};
createDDSFuzzSuite(model, options);
});
Expand Down
19 changes: 16 additions & 3 deletions packages/loader/container-loader/src/connectionStateHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,25 @@ class ConnectionStateCatchup extends ConnectionStateHandlerPassThrough {
class ConnectionStateHandler implements IConnectionStateHandler {
private _connectionState = ConnectionState.Disconnected;
private _pendingClientId: string | undefined;

/**
* Tracks that we observe the "leave" op within the timeout for our previous clientId (see comment on ConnectionStateHandler class)
* ! This ensures we do not switch to a new clientId until we process all potential messages from old clientId
* ! i.e. We will always see the "leave" op for a client after we have seen all the ops it has sent
* ! This check helps prevent the same op from being resubmitted by the PendingStateManager upon reconnecting
*/
private readonly prevClientLeftTimer: Timer;

/**
* Tracks that we observe our own "join" op within the timeout after receiving a "connected" event from the DeltaManager
*/
private readonly joinOpTimer: Timer;

private protocol?: IProtocolHandler;
private connection?: IConnectionDetailsInternal;
private _clientId?: string;

/** Track how long we waited to see "leave" op for previous clientId */
private waitEvent: PerformanceEvent | undefined;

public get connectionState(): ConnectionState {
Expand Down Expand Up @@ -453,9 +466,9 @@ class ConnectionStateHandler implements IConnectionStateHandler {
0x2e2 /* "Must only wait for leave message when clientId in quorum" */,
);

// Move to connected state only if we are in Connecting state, we have seen our join op
// and there is no timer running which means we are not waiting for previous client to leave
// or timeout has occurred while doing so.
// Move to connected state only if:
// 1. We have seen our own "join" op (i.e. for this.pendingClientId)
// 2. There is no "leave" timer running, meaning this is our first connection or the previous client has left (via this.prevClientLeftTimer)
if (
this.pendingClientId !== this.clientId &&
this.hasMember(this.pendingClientId) &&
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3667,6 +3667,7 @@ export class ContainerRuntime
/**
* Finds the right store and asks it to resubmit the message. This typically happens when we
* reconnect and there are pending messages.
* ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer)
* @param message - The original LocalContainerRuntimeMessage.
* @param localOpMetadata - The local metadata associated with the original message.
*/
Expand Down
Loading

0 comments on commit 7fc7f65

Please sign in to comment.