Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NDNts-aux snapshot #6

Merged
merged 15 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ucla-irl/ndnts-aux",
"version": "3.0.4",
"version": "4.0.0",
"description": "NDNts Auxiliary Package for Web and Deno",
"scripts": {
"test": "deno test --no-check",
Expand Down
113 changes: 111 additions & 2 deletions src/adaptors/yjs-ndn-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import * as Y from 'yjs';
import { Awareness } from 'y-protocols/awareness.js';
import { Bundler } from './bundler.ts';

// Adam Chen Additional Imports
CAWorks-ChrisA marked this conversation as resolved.
Show resolved Hide resolved
import { Decoder, Encoder } from '@ndn/tlv';
import { Component, Data, Name } from '@ndn/packet';
import { Version } from '@ndn/naming-convention2';
import { StateVector } from '@ndn/svs';

/**
* NDN SVS Provider for Yjs. Wraps update into `SyncAgent`'s `update` channel.
*
Expand Down Expand Up @@ -33,11 +39,14 @@ export class NdnSvsAdaptor {
useBundler: boolean = false,
) {
syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content));
// Adam Chen callback on receiving a snapshot blob for Injection Point 3
syncAgent.register('blob', 'snapshot', (content) => this.handleSnapshotUpdate(content));
CAWorks-ChrisA marked this conversation as resolved.
Show resolved Hide resolved
doc.on('update', this.callback);
if (useBundler) {
// Adam Chen Injection Point 1 override
CAWorks-ChrisA marked this conversation as resolved.
Show resolved Hide resolved
this.#bundler = new Bundler(
Y.mergeUpdates,
(content) => this.syncAgent.publishUpdate(this.topic, content),
(content) => this.publishUpdate(this.topic, content),
{
thresholdSize: 3000,
delayMs: 400,
Expand Down Expand Up @@ -96,9 +105,109 @@ export class NdnSvsAdaptor {
if (this.#bundler) {
await this.#bundler.produce(content);
} else {
await this.syncAgent.publishUpdate(this.topic, content);
// Adam Chen Injection point 1 override
await this.publishUpdate(this.topic, content);
}
}

// Adam Chen Injection point 1
private async publishUpdate(topic: string, content: Uint8Array) {
await this.syncAgent.publishUpdate(topic, content);

const stateVector = this.syncAgent.getUpdateSyncSV();
let count = 0;
for (const [_id, seq] of stateVector) {
count += seq;
}
// Snapshot Interval configuration: Currently hard-coded
// TODO: make the interval configurable
if (count % 5 == 0) {
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
CAWorks-ChrisA marked this conversation as resolved.
Show resolved Hide resolved
const encodedSV = Encoder.encode(stateVector);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// TODO: Currently naming convention is hard-coded. May need organizing.
const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot');
// New SVS encodings
const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV));

// Snapshot content generation
const content = Y.encodeStateAsUpdate(this.doc);
// its already in UInt8Array (binary), transporting currently without any additional encoding.
// use syncAgent's blob and publish mechanism
await this.syncAgent.publishBlob('snapshot', content, snapshotName, true);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// Race Condition note: Testing suggests that the write above with publishBlob()
// is near certainly done before the read happens below.
// Hence no delay is added.
// first segmented object is at /50=%00
const firstSegmentName = snapshotName.append('50=%00').toString();
const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName);
if (firstSegmentPacketEncoded) {
const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data);
await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket));
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
// End Injection point 1

// -- Adam Chen Injection Point 3: HandleSnapshotUpdate --
async handleSnapshotUpdate(snapshotName: Uint8Array) {
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
// Maybe it's wise to put this under a try() because it might fail due to network issues.
const decodedSnapshotName = Decoder.decode(snapshotName, Name);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot');

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString());
let oldSVCount = 0;
if (oldSnapshotFirstSegmentEncoded) {
const oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data);
const oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value, StateVector);
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
for (const [_id, seq] of oldSnapshotVector) {
oldSVCount += seq;
}
}

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value, StateVector);
let snapshotSVcount = 0;
for (const [_id, seq] of snapshotSV) {
snapshotSVcount += seq;
}

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
if (snapshotSVcount > oldSVCount) {
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
const firstSegmentName = decodedSnapshotName.append('50=%00').toString();
// Race Condition Note: The callback to here is faster than
// fetchBlob() finish writing to persistStore.
// (in syncAgent before listener callback to here)
// Tested getBlob() to guarantee item arrival
// But ends up having multiple active sessions of fetchBlob(). bad.
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
// Hence a delay of 1 second.
await new Promise((r) => setTimeout(r, 1000));
const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName);
if (firstSegmentPacketEncoded) {
const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data);
// utilize snapshotPrefix above, with the same namingConvention warning.
// this is done to update the key of the prefix so program return latest when blind fetching.
this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket));
// should set snapshotPrefix to the newest packet.
} else {
console.debug('PersistentStorage doesnt have the snapshot yet. Skipping update.');
// If the above race condition fails (reads before data arrives),
// 'endpoint's blind fetch mechanism' is not updated to latest, should be fine.
}
}
}
// End Injection point 3

public handleSyncUpdate(content: Uint8Array) {
// Apply patch
Expand Down
17 changes: 16 additions & 1 deletion src/sync-agent/sync-agent.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as endpoint from '@ndn/endpoint';
import type { Forwarder } from '@ndn/fw';
import { Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Component, Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Decoder, Encoder } from '@ndn/tlv';
import { BufferChunkSource, DataProducer, fetch } from '@ndn/segmented-object';
import { concatBuffers, fromHex } from '@ndn/util';
Expand Down Expand Up @@ -353,6 +353,21 @@ export class SyncAgent implements AsyncDisposable {

async serve(interest: Interest) {
const intName = interest.name;

// -- Adam Chen Injection point 2 --
// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
if (intName.get(this.appPrefix.length)?.equals(Component.from('32=snapshot'))) {
const wire = await this.persistStorage.get(intName.toString());
if (wire === undefined || wire.length === 0) {
// console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`);
return undefined;
}
const data = Decoder.decode(wire, Data);
return data;
}
// -- End Injection point 2 --

if (intName.length <= this.appPrefix.length + 1) {
// The name should be at least two components plus app prefix
return undefined;
Expand Down