diff --git a/service/s3.cc b/service/s3.cc index e33e8af3..019c6576 100644 --- a/service/s3.cc +++ b/service/s3.cc @@ -889,6 +889,19 @@ finishMultiPartUpload(const std::string & bucket, } } +void +S3Api:: +abortMultiPartUpload(const std::string & bucket, + const std::string & resource, + const std::string & uploadId) + const +{ + auto result = erase(bucket, resource, "uploadId=" + uploadId); + if (result.code_ != 204) { + throw ML::Exception("error aborting multipart upload: " + uploadId); + } +} + void S3Api::MultiPartUploadPart:: fromXml(tinyxml2::XMLElement * element) @@ -1965,8 +1978,8 @@ struct StreamingUploadSource { struct Impl { Impl() - : offset(0), chunkIndex(0), shutdown(false), - chunks(16) + : offset(0), chunkIndex(0), shutdown(false), chunks(16), + exceptionThrown(false), uploadAborted(false) { } @@ -2062,6 +2075,8 @@ struct StreamingUploadSource { std::mutex etagsLock; std::vector etags; std::exception_ptr exc; + bool exceptionThrown; + bool uploadAborted; ML::OnUriHandlerException onException; void start() @@ -2095,8 +2110,10 @@ struct StreamingUploadSource { std::streamsize write(const char_type* s, std::streamsize n) { - if (exc) - std::rethrow_exception(exc); + if (exc) { + handleException(); + return 0; + } size_t done = current.append(s, n); offset += done; @@ -2108,8 +2125,10 @@ struct StreamingUploadSource { //cerr << "writing " << n << " characters returned " // << done << endl; - if (exc) - std::rethrow_exception(exc); + if (exc) { + handleException(); + return 0; + } return done; } @@ -2129,8 +2148,11 @@ struct StreamingUploadSource { void finish() { - if (exc) - std::rethrow_exception(exc); + if (exc) { + handleException(); + return; + } + // cerr << "pushing last chunk " << chunkIndex << endl; flush(); @@ -2147,23 +2169,20 @@ struct StreamingUploadSource { // Make sure that an exception in uploading the last chunk doesn't // lead to a corrupt (truncated) file - if (exc) - std::rethrow_exception(exc); + if (exc) { + handleException(); + return; + } - string etag; try { - etag = owner->finishMultiPartUpload(bucket, "/" + object, - uploadId, - etags); + owner->finishMultiPartUpload(bucket, "/" + object, uploadId, + etags); } catch (...) { + owner->abortMultiPartUpload(bucket, "/" + object, uploadId); onException(); throw; } - //cerr << "final etag is " << etag << endl; - - if (exc) - std::rethrow_exception(exc); // double elapsed = Date::now().secondsSince(startDate); @@ -2173,18 +2192,38 @@ struct StreamingUploadSource { // << "MB/s" << " to " << etag << endl; } + void handleException() + { + if (!uploadAborted) { + owner->abortMultiPartUpload(bucket, "/" + object, uploadId); + uploadAborted = true; + } + if (!exceptionThrown) { + exceptionThrown = true; + std::rethrow_exception(exc); + } + } + + /* upload threads */ void runThread() { while (!shutdown) { Chunk chunk; if (chunks.tryPop(chunk, 0.01)) { - if (exc) + if (exc) { + while (chunks.tryPop(chunk)); return; + } try { //cerr << "got chunk " << chunk.index // << " with " << chunk.size << " bytes at index " // << chunk.index << endl; + if (chunk.index == metadata.throwChunk) { + throw ML::Exception("deterministic upload" + " exception at chunk %d", + chunk.index); + } // Upload the data string md5 = md5HashToHex(chunk.data, chunk.size); @@ -2537,10 +2576,12 @@ struct RegisterS3Handler { throw ML::Exception("unknown aws option " + name + "=" + value + " opening S3 object " + resource); } - else if(name == "num-threads") - { + else if (name == "num-threads") { md.numThreads = std::stoi(value); } + else if (name == "throw-chunk") { + md.throwChunk = std::stoi(value); + } else { cerr << "warning: skipping unknown S3 option " << name << "=" << value << endl; diff --git a/service/s3.h b/service/s3.h index d4eab6c7..b3222a18 100644 --- a/service/s3.h +++ b/service/s3.h @@ -245,7 +245,7 @@ struct S3Api : public AwsApi { ObjectMetadata(const Redundancy & redundancy) : redundancy(redundancy), serverSideEncryption(SSE_NONE), - numThreads(8) + numThreads(8), throwChunk(-1) { } @@ -259,6 +259,10 @@ struct S3Api : public AwsApi { std::map metadata; std::string acl; unsigned int numThreads; + + /* Index of the chunk during a write operation after which to emulate + an HTTP exception. */ + int throwChunk; }; /** Signed request that can be executed. */ @@ -558,6 +562,11 @@ struct S3Api : public AwsApi { const std::string & uploadId, const std::vector & etags) const; + void + abortMultiPartUpload(const std::string & bucket, + const std::string & resource, + const std::string & uploadId) const; + void uploadRecursive(std::string dirSrc, std::string bucketDest, bool includeDir);