Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 38 additions & 35 deletions lib/lib-storage/src/Upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ import {
Tag,
UploadPartCommand,
} from "@aws-sdk/client-s3";
import { AbortController } from "@smithy/abort-controller";
import {
EndpointParameterInstructionsSupplier,
getEndpointFromInstructions,
toEndpointV1,
} from "@smithy/middleware-endpoint";
import { HttpRequest } from "@smithy/protocol-http";
import { extendedEncodeURIComponent } from "@smithy/smithy-client";
import type { AbortController as IAbortController, AbortSignal as IAbortSignal, Endpoint } from "@smithy/types";
import type { Endpoint } from "@smithy/types";
import { EventEmitter } from "events";

import { byteLength } from "./bytelength";
import { getChunk } from "./chunker";
import { wireSignal } from "./signal";
import { BodyDataTypes, Options, Progress } from "./types";

export interface RawDataPart {
Expand Down Expand Up @@ -59,8 +59,7 @@ export class Upload extends EventEmitter {
private bytesUploadedSoFar: number;

// used in the upload.
private abortController: IAbortController;
private concurrentUploaders: Promise<void>[] = [];
private abortController = new AbortController();
private createMultiPartPromise?: Promise<CreateMultipartUploadCommandOutput>;
private abortMultipartUploadCommand: AbortMultipartUploadCommand | null = null;

Expand Down Expand Up @@ -93,7 +92,9 @@ export class Upload extends EventEmitter {
// set progress defaults
this.totalBytes = byteLength(this.params.Body);
this.bytesUploadedSoFar = 0;
this.abortController = options.abortController ?? new AbortController();

wireSignal(this.abortController, options.abortSignal);
wireSignal(this.abortController, options.abortController?.signal);
}

async abort(): Promise<void> {
Expand All @@ -111,7 +112,12 @@ export class Upload extends EventEmitter {
);
}
this.sent = true;
return await Promise.race([this.__doMultipartUpload(), this.__abortTimeout(this.abortController.signal)]);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race here could cause the returned promise to be rejected when __doMultipartUpload actually succeeds (and is still working on updating tags).


try {
return await this.__doMultipartUpload();
} finally {
this.abortController.abort();
}
}

public on(event: "httpUploadProgress", listener: (progress: Progress) => void): this {
Expand Down Expand Up @@ -143,7 +149,12 @@ export class Upload extends EventEmitter {
eventEmitter.on("xhr.upload.progress", uploadEventListener);
}

const resolved = await Promise.all([this.client.send(new PutObjectCommand(params)), clientConfig?.endpoint?.()]);
const resolved = await Promise.all([
this.client.send(new PutObjectCommand(params), {
abortSignal: this.abortController.signal,
}),
clientConfig?.endpoint?.(),
]);
const putResult = resolved[0];
let endpoint: Endpoint | undefined = resolved[1];

Expand Down Expand Up @@ -291,7 +302,10 @@ export class Upload extends EventEmitter {
UploadId: this.uploadId,
Body: dataPart.data,
PartNumber: dataPart.partNumber,
})
}),
{
abortSignal: this.abortController.signal,
}
);

if (eventEmitter !== null) {
Expand Down Expand Up @@ -333,28 +347,27 @@ export class Upload extends EventEmitter {

private async __doMultipartUpload(): Promise<CompleteMultipartUploadCommandOutput> {
const dataFeeder = getChunk(this.params.Body, this.partSize);
const concurrentUploaderFailures: Error[] = [];
const concurrentUploads: Promise<void>[] = [];

for (let index = 0; index < this.queueSize; index++) {
const currentUpload = this.__doConcurrentUpload(dataFeeder).catch((err) => {
concurrentUploaderFailures.push(err);
});
this.concurrentUploaders.push(currentUpload);
const currentUpload = this.__doConcurrentUpload(dataFeeder);
concurrentUploads.push(currentUpload);
}

await Promise.all(this.concurrentUploaders);
if (concurrentUploaderFailures.length >= 1) {
/**
* Previously, each promise in concurrentUploads could potentially throw
* and immediately return control to user code. However, we want to wait for
* all uploaders to finish before calling AbortMultipartUpload to avoid
* stranding uploaded parts.
*
* We throw only the first error to be consistent with prior behavior,
* but may consider combining the errors into a report in the future.
*/
const results = await Promise.allSettled(concurrentUploads);
const firstFailure = results.find((result) => result.status === "rejected");
if (firstFailure) {
await this.markUploadAsAborted();
/**
* Previously, each promise in concurrentUploaders could potentially throw
* and immediately return control to user code. However, we want to wait for
* all uploaders to finish before calling AbortMultipartUpload to avoid
* stranding uploaded parts.
*
* We throw only the first error to be consistent with prior behavior,
* but may consider combining the errors into a report in the future.
*/
throw concurrentUploaderFailures[0];
throw firstFailure.reason;
}

if (this.abortController.signal.aborted) {
Expand Down Expand Up @@ -417,16 +430,6 @@ export class Upload extends EventEmitter {
}
}

private async __abortTimeout(abortSignal: IAbortSignal): Promise<never> {
return new Promise((resolve, reject) => {
abortSignal.onabort = () => {
const abortError = new Error("Upload aborted.");
abortError.name = "AbortError";
reject(abortError);
};
});
}

private __validateInput(): void {
if (!this.params) {
throw new Error(`InputError: Upload requires params to be passed to upload.`);
Expand Down
51 changes: 51 additions & 0 deletions lib/lib-storage/src/signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { IAbortSignal } from "./types";

/**
* This function wires an external abort signal to an internal abort controller.
* The internal abort controller will be aborted when the external signal is
* aborted.
*
* Every callback created will be removed as soon as either the internal or
* external signal is aborted. This allows to avoid memory leaks, especially if
* the external signal has a (significantly) longer lifespan than the internal
* one.
*
* In order to ensure that any references are removed, make sure to always
* `abort()` the internal controller when you are done with it.
*/
export function wireSignal(internalController: AbortController, externalSignal?: IAbortSignal): void {
if (!externalSignal || internalController.signal.aborted) {
return;
}
if (externalSignal.aborted) {
internalController.abort();
return;
}

if (isNativeSignal(externalSignal)) {
externalSignal.addEventListener("abort", () => internalController.abort(), {
once: true,
signal: internalController.signal,
});
} else {
// backwards compatibility
const origOnabort = externalSignal.onabort;
const restore = () => {
externalSignal.onabort = origOnabort;
};

externalSignal.onabort = function () {
internalController.abort();
restore();
origOnabort?.call(this);
};

// Let's clear any reference to the internal controller when it is aborted,
// avoiding potential memory leaks.
internalController.signal.addEventListener("abort", restore, { once: true });
}
}

export function isNativeSignal(signal: IAbortSignal): signal is globalThis.AbortSignal {
return "addEventListener" in signal && typeof signal.addEventListener === "function";
}
14 changes: 12 additions & 2 deletions lib/lib-storage/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
Tag,
UploadPartCommandInput,
} from "@aws-sdk/client-s3";
import type { AbortController } from "@smithy/types";
import type { AbortController, AbortSignal } from "@smithy/types";

export interface Progress {
loaded?: number;
Expand All @@ -19,6 +19,9 @@ export interface Progress {
// string | Uint8Array | Buffer | Readable | ReadableStream | Blob.
export type BodyDataTypes = PutObjectCommandInput["Body"];

export type IAbortController = AbortController | globalThis.AbortController;
export type IAbortSignal = AbortSignal | globalThis.AbortSignal;

/**
* @deprecated redundant, use {@link S3Client} directly.
*/
Expand Down Expand Up @@ -51,8 +54,15 @@ export interface Configuration {

/**
* Optional abort controller for controlling this upload's abort signal externally.
*
* @deprecated use `abortSignal` instead.
*/
abortController?: IAbortController;

/**
* Optional abort signal for controlling this upload's abort signal externally.
*/
abortController?: AbortController;
abortSignal?: globalThis.AbortSignal;
}

export interface Options extends Partial<Configuration> {
Expand Down
Loading