Skip to content

Commit

Permalink
Receive splited to chunks command.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Dec 18, 2023
1 parent d1cfd74 commit 63a22c5
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 205 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,75 @@
import * as Serialization from "./binary-serialization";
import { PeerCommandType, PeerCommand } from "./types";

const peerCommandTypes = Object.values(PeerCommandType);
const FRAME_PART_LENGTH = 4;
const commandFrameStart = stringToUtf8CodesBuffer("cstr", FRAME_PART_LENGTH);
const commandFrameEnd = stringToUtf8CodesBuffer("cend", FRAME_PART_LENGTH);
const commandDivFrameStart = stringToUtf8CodesBuffer("dstr", FRAME_PART_LENGTH);
const commandDivFrameEnd = stringToUtf8CodesBuffer("dend", FRAME_PART_LENGTH);
const startFrames = [commandFrameStart, commandDivFrameStart];
const endFrames = [commandFrameEnd, commandDivFrameEnd];
const commandFramesLength = commandFrameStart.length + commandFrameEnd.length;

export function isCommandChunk(buffer: Uint8Array) {
const length = commandFrameStart.length;
const bufferEndingToCompare = buffer.slice(-length);
return (
startFrames.some((frame) =>
areBuffersEqual(buffer, frame, FRAME_PART_LENGTH)
) &&
endFrames.some((frame) =>
areBuffersEqual(bufferEndingToCompare, frame, FRAME_PART_LENGTH)
)
);
}

function isFirstCommandChunk(buffer: Uint8Array) {
return areBuffersEqual(buffer, commandFrameStart, FRAME_PART_LENGTH);
}

function isLastCommandChunk(buffer: Uint8Array) {
return areBuffersEqual(
buffer.slice(-FRAME_PART_LENGTH),
commandFrameEnd,
FRAME_PART_LENGTH
);
}

export class BinaryCommandJoiningError extends Error {
constructor(readonly type: "incomplete-joining" | "no-first-chunk") {
super();
}
}

export class BinaryCommandChunksJoiner {
private readonly chunks = new Serialization.ResizableUint8Array();
private status: "joining" | "completed" = "joining";

constructor(
private readonly onComplete: (commandBuffer: Uint8Array) => void
) {}

const commandFrameStart = stringToUtf8CodesBuffer("cstr");
const commandFrameEnd = stringToUtf8CodesBuffer("cend");
const commandDivisionFrameStart = stringToUtf8CodesBuffer("dstr");
const commandDivisionFrameEnd = stringToUtf8CodesBuffer("dend");
const commandFrameLength = commandFrameStart.length + commandFrameEnd.length;
addCommandChunk(chunk: Uint8Array) {
if (this.status === "completed") return;

const isFirstChunk = isFirstCommandChunk(chunk);
if (!this.chunks.length && !isFirstChunk) {
throw new BinaryCommandJoiningError("no-first-chunk");
}
if (this.chunks.length && isFirstChunk) {
throw new BinaryCommandJoiningError("incomplete-joining");
}
this.chunks.push(this.unframeCommandChunk(chunk));

if (!isLastCommandChunk(chunk)) return;
this.status = "completed";
this.onComplete(this.chunks.getBuffer());
}

private unframeCommandChunk(chunk: Uint8Array) {
return chunk.slice(FRAME_PART_LENGTH, chunk.length - FRAME_PART_LENGTH);
}
}

export class BinaryCommandCreator {
private readonly bytes = new Serialization.ResizableUint8Array();
Expand All @@ -18,7 +80,6 @@ export class BinaryCommandCreator {
commandType: PeerCommandType,
private readonly maxChunkLength: number
) {
this.bytes.push("{".charCodeAt(0));
this.bytes.push(commandType);
}

Expand All @@ -37,11 +98,12 @@ export class BinaryCommandCreator {
}

complete() {
if (!this.bytes.length) throw new Error("Buffer is empty");
if (this.status === "completed") return;
this.status = "completed";

const unframedBuffer = this.bytes.getBuffer();
if (unframedBuffer.length + commandFrameLength <= this.maxChunkLength) {
if (unframedBuffer.length + commandFramesLength <= this.maxChunkLength) {
this.resultBuffers.push(
frameBuffer(unframedBuffer, commandFrameStart, commandFrameEnd)
);
Expand All @@ -50,27 +112,27 @@ export class BinaryCommandCreator {

let chunksAmount = Math.ceil(unframedBuffer.length / this.maxChunkLength);
if (
Math.ceil(unframedBuffer.length / chunksAmount) + commandFrameLength >
Math.ceil(unframedBuffer.length / chunksAmount) + commandFramesLength >
this.maxChunkLength
) {
chunksAmount++;
}

for (const [index, chunk] of splitBufferToEqualChunks(
for (const [i, chunk] of splitBufferToEqualChunks(
unframedBuffer,
chunksAmount
)) {
if (index === 0) {
if (i === 0) {
this.resultBuffers.push(
frameBuffer(chunk, commandFrameStart, commandDivisionFrameEnd)
frameBuffer(chunk, commandFrameStart, commandDivFrameEnd)
);
} else if (index === chunksAmount - 1) {
} else if (i === chunksAmount - 1) {
this.resultBuffers.push(
frameBuffer(chunk, commandDivisionFrameStart, commandFrameEnd)
frameBuffer(chunk, commandDivFrameStart, commandFrameEnd)
);
} else {
this.resultBuffers.push(
frameBuffer(chunk, commandDivisionFrameStart, commandDivisionFrameEnd)
frameBuffer(chunk, commandDivFrameStart, commandDivFrameEnd)
);
}
}
Expand All @@ -84,28 +146,14 @@ export class BinaryCommandCreator {
}
}

export function isCommandBuffer(bytes: Uint8Array) {
const [start, commandCode] = bytes;
const end = bytes[bytes.length - 1];

return (
start === "{".charCodeAt(0) &&
end === "}".charCodeAt(0) &&
peerCommandTypes.includes(commandCode)
);
}

export function deserializeCommand(bytes: Uint8Array): PeerCommand {
if (!isCommandBuffer(bytes)) {
throw new Error("Given bytes don't represent peer command.");
}
const [, commandCode] = bytes;
const [commandCode] = bytes;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const deserializedCommand: { [key: string]: any } = {
c: commandCode,
};

let offset = 2;
let offset = 1;
do {
const name = String.fromCharCode(bytes[offset]);
offset++;
Expand All @@ -130,8 +178,7 @@ export function deserializeCommand(bytes: Uint8Array): PeerCommand {
}
break;
}
} while (offset < bytes.length && bytes[offset] !== "}".charCodeAt(0));
// TODO: type guards
} while (offset < bytes.length);
return deserializedCommand as unknown as PeerCommand;
}

Expand All @@ -144,8 +191,11 @@ function getDataTypeFromByte(byte: number): Serialization.SerializedItem {
return typeCode as Serialization.SerializedItem;
}

function stringToUtf8CodesBuffer(string: string): Uint8Array {
const buffer = new Uint8Array(string.length);
function stringToUtf8CodesBuffer(string: string, length?: number): Uint8Array {
if (length && string.length !== length) {
throw new Error("Wrong string length");
}
const buffer = new Uint8Array(length ?? string.length);
for (let i = 0; i < string.length; i++) buffer[i] = string.charCodeAt(i);
return buffer;
}
Expand Down Expand Up @@ -174,3 +224,14 @@ function frameBuffer(

return result;
}

function areBuffersEqual(
buffer1: Uint8Array,
buffer2: Uint8Array,
length: number
) {
for (let i = 0; i < length; i++) {
if (buffer1[i] !== buffer2[i]) return false;
}
return true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ export function serializeSimilarIntArray(numbers: bigint[]) {
const { length } = binaryArray.getBytesChunks();
const commonPartWithLength = commonPart | (BigInt(length) & 0xffn);
binaryArray.unshift(serializeInt(commonPartWithLength));
result.push(binaryArray.getBytes());
result.push(binaryArray.getBuffer());
}

return result.getBytes();
return result.getBuffer();
}

export function deserializeSimilarIntArray(bytes: Uint8Array) {
Expand Down
45 changes: 28 additions & 17 deletions packages/p2p-media-loader-core/src/p2p/commands/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,64 @@ import {
} from "./types";

function serializeSegmentAnnouncementCommand(
command: PeerSegmentAnnouncementCommand
command: PeerSegmentAnnouncementCommand,
maxChunkSize: number
) {
const { c: commandCode, p: loadingByHttp, l: loaded } = command;
const creator = new BinaryCommandCreator(commandCode);
const creator = new BinaryCommandCreator(commandCode, maxChunkSize);
if (loaded?.length) creator.addSimilarIntArr("l", loaded);
if (loadingByHttp?.length) {
creator.addSimilarIntArr("p", loadingByHttp);
}
creator.complete();
return creator.getResultBuffer();
return creator.getResultBuffers();
}

function serializePeerSegmentCommand(command: PeerSegmentCommand) {
const creator = new BinaryCommandCreator(command.c);
function serializePeerSegmentCommand(
command: PeerSegmentCommand,
maxChunkSize: number
) {
const creator = new BinaryCommandCreator(command.c, maxChunkSize);
creator.addInteger("i", command.i);
creator.complete();
return creator.getResultBuffer();
return creator.getResultBuffers();
}

function serializePeerSendSegmentCommand(command: PeerSendSegmentCommand) {
const creator = new BinaryCommandCreator(command.c);
function serializePeerSendSegmentCommand(
command: PeerSendSegmentCommand,
maxChunkSize: number
) {
const creator = new BinaryCommandCreator(command.c, maxChunkSize);
creator.addInteger("i", command.i);
creator.addInteger("s", command.s);
creator.complete();
return creator.getResultBuffer();
return creator.getResultBuffers();
}

function serializePeerSegmentRequestCommand(
command: PeerRequestSegmentCommand
command: PeerRequestSegmentCommand,
maxChunkSize: number
) {
const creator = new BinaryCommandCreator(command.c);
const creator = new BinaryCommandCreator(command.c, maxChunkSize);
creator.addInteger("i", command.i);
if (command.b) creator.addInteger("b", command.b);
creator.complete();
return creator.getResultBuffer();
return creator.getResultBuffers();
}

export function serializePeerCommand(command: PeerCommand) {
export function serializePeerCommand(
command: PeerCommand,
maxChunkSize: number
) {
switch (command.c) {
case PeerCommandType.CancelSegmentRequest:
case PeerCommandType.SegmentAbsent:
return serializePeerSegmentCommand(command);
return serializePeerSegmentCommand(command, maxChunkSize);
case PeerCommandType.SegmentRequest:
return serializePeerSegmentRequestCommand(command);
return serializePeerSegmentRequestCommand(command, maxChunkSize);
case PeerCommandType.SegmentsAnnouncement:
return serializeSegmentAnnouncementCommand(command);
return serializeSegmentAnnouncementCommand(command, maxChunkSize);
case PeerCommandType.SegmentData:
return serializePeerSendSegmentCommand(command);
return serializePeerSendSegmentCommand(command, maxChunkSize);
}
}
7 changes: 6 additions & 1 deletion packages/p2p-media-loader-core/src/p2p/commands/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
export * from "./types";
export { serializePeerCommand } from "./commands";
export { deserializeCommand, isCommandBuffer } from "./binary-command-creator";
export {
deserializeCommand,
isCommandChunk,
BinaryCommandChunksJoiner,
BinaryCommandJoiningError,
} from "./binary-command-creator";
14 changes: 7 additions & 7 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class P2PLoader {
if (!peer) return;

const request = this.requests.getOrCreateRequest(segment);
peer.fulfillSegmentRequest(request);
peer.downloadSegment(request);
}

isLoadingOrLoadedBySomeone(segment: Segment): boolean {
Expand Down Expand Up @@ -87,18 +87,18 @@ export class P2PLoader {
}

private onPeerConnected = (peer: Peer) => {
const announcement = this.getSegmentsAnnouncement();
peer.sendSegmentsAnnouncement(announcement);
const { httpLoading, loaded } = this.getSegmentsAnnouncement();
peer.sendSegmentsAnnouncementCommand(loaded, httpLoading);
};

broadcastAnnouncement = () => {
if (this.isAnnounceMicrotaskCreated) return;

this.isAnnounceMicrotaskCreated = true;
queueMicrotask(() => {
const announcement = this.getSegmentsAnnouncement();
const { httpLoading, loaded } = this.getSegmentsAnnouncement();
for (const peer of this.trackerClient.peers()) {
peer.sendSegmentsAnnouncement(announcement);
peer.sendSegmentsAnnouncementCommand(loaded, httpLoading);
}
this.isAnnounceMicrotaskCreated = false;
});
Expand All @@ -116,10 +116,10 @@ export class P2PLoader {
if (!segment) return;
const segmentData = await this.segmentStorage.getSegmentData(segment);
if (!segmentData) {
peer.sendSegmentAbsent(segmentExternalId);
peer.sendSegmentAbsentCommand(segmentExternalId);
return;
}
void peer.sendSegmentData(
void peer.uploadSegmentData(
segmentExternalId,
byteFrom !== undefined ? segmentData.slice(byteFrom) : segmentData
);
Expand Down
Loading

0 comments on commit 63a22c5

Please sign in to comment.