diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index cfb35c85..402956a2 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -123,6 +123,7 @@ module.exports = function(configure) { }); }, fromS3: (file, opts) => { + let pass = PassThrough(); let obj = s3.getObject({ Bucket: file.bucket || file.Bucket, Key: file.key || file.Key, @@ -135,14 +136,68 @@ module.exports = function(configure) { [ "httpHeaders", "httpUploadProgress", - "httpDownloadProgress", + "httpDownloadProgress" ].map(event => { obj.on(event, (...args) => { - // console.log("Event:", event); + //console.log("Event:", event); stream.emit(event, ...args); }); }); - return stream; + stream.on("error", async (err) => { + let passAlong = true; + if (err.code === "NoSuchKey") { + try { + await ls.tryFinalizeMultipartUpload(file); + passAlong = false; + stream.unpipe(pass); + let newStream = s3.getObject({ + Bucket: file.bucket || file.Bucket, + Key: file.key || file.Key, + Range: file.range || undefined + }).createReadStream(); + newStream.on("error", err => pass.emit("error", err)); + ls.pipe(newStream, pass); + } catch (err) { + logger.error("Error Looking for partial Multipart Upload", err); + } + } + if (passAlong) { + pass.emit("error", err); + } + }); + stream.pipe(pass); + return pass; + }, + tryFinalizeMultipartUpload: async (file) => { + logger.debug("MultipartUpload Start", file); + let bucket = file.bucket || file.Bucket; + let key = file.key || file.Key; + let mpResponse = await s3.listMultipartUploads({ + Bucket: bucket, + Prefix: key + }).promise(); + if (mpResponse.Uploads.length) { + let file = mpResponse.Uploads[0]; + let allParts = await s3.listParts({ + Bucket: bucket, + Key: file.Key, + UploadId: file.UploadId + }).promise(); + let parts = allParts.Parts.map(p => ({ ETag: p.ETag, PartNumber: p.PartNumber })) + if (parts.length > 0) { + let params = { + Bucket: bucket, + Key: file.Key, + UploadId: file.UploadId, + MultipartUpload: { + Parts: parts + } + }; + console.log("MultipartUpload params:", params); + await s3.completeMultipartUpload(params).promise(); + logger.log(`MultipartUpload Complete:`, bucket, file.Key, file.UploadId); + } + } }, asEvent: _streams.asEvent, log: _streams.log, @@ -1448,7 +1503,7 @@ module.exports = function(configure) { async.eachOfSeries(items, (item, i, done) => { var cb = done; - done = (err) => { + done = function(err) { if (item.s3 != null) { free_s3_stream(i); }