Skip to content

Commit

Permalink
Merge pull request #47 from aliyun/restorebuket
Browse files Browse the repository at this point in the history
add a new ResumableUploadObject API
  • Loading branch information
baiyubin2020 authored Dec 21, 2017
2 parents 5fc70e3 + 9254f7e commit 99a4dc0
Show file tree
Hide file tree
Showing 21 changed files with 500 additions and 169 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# ChangeLog - Aliyun OSS SDK for C#

## 版本号:2.7.0 日期:2017/12/21
### 变更内容
- 修复:优化异步机制实现
- 增加:支持断点并发上传下载
- 增加:支持符号链接symlink
- 增加:LifecycleRule支持碎片及所有类型的存储

## 版本号:2.6.0 日期:2017/11/30
### 变更内容
- 增加:CreateBucket支持StorageClass
Expand Down
2 changes: 1 addition & 1 deletion README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
- OSS C# SDK[在线文档](http://gosspublic.alicdn.com/AliyunNetSDK/apidocs/latest/index.html)

## 版本
- 当前版本:2.6.0
- 当前版本:2.7.0

## 运行环境

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
- OSS C# SDK[Online Documentation](http://gosspublic.alicdn.com/AliyunNetSDK/international/apidocs/latest/index.html).

## Version
- Current version: 2.6.0.
- Current version: 2.7.0.

## Run environment

Expand Down
4 changes: 2 additions & 2 deletions samples/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//
// You can specify all the values or you can use the default the Revision and
// Build Numbers by using the '*' as shown below:
[assembly: AssemblyVersion("2.6.0")]
[assembly: AssemblyVersion("2.7.0")]
[assembly: NeutralResourcesLanguage("zh-CN")]
[assembly: AssemblyFileVersion("2.6.0")]
[assembly: AssemblyFileVersion("2.7.0")]

43 changes: 24 additions & 19 deletions samples/Samples/ResumableSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@ public static class ResumbaleSample

static string fileToUpload = Config.BigFileToUpload;

public static void ResumableUploadObject(string bucketName)
public static void ResumableUploadObject(string bucketName)
{
const string key = "ResumableUploadObject";
string checkpointDir = Config.DirToDownload;
try
{
client.ResumableUploadObject(bucketName, key, fileToUpload, null, checkpointDir);
UploadObjectRequest request = new UploadObjectRequest(bucketName, key, fileToUpload)
{
PartSize = 8 * 1024 * 1024,
ParallelThreadCount = 3,
CheckpointDir = checkpointDir,
};
client.ResumableUploadObject(request);
Console.WriteLine("Resumable upload object:{0} succeeded", key);
}
catch (OssException ex)
Expand All @@ -45,16 +51,21 @@ public static void ResumableUploadObject(string bucketName)
}
}

public static void ResumableCopyObject(string sourceBucketName, string sourceKey,
string destBucketName, string destKey)
public static void ResumableDownloadObject(string bucketName)
{

const string key = "ResumableDownloadObject";
string fileToDownload = Config.DirToDownload + key;
string checkpointDir = Config.DirToDownload;
try
{
var request = new CopyObjectRequest(sourceBucketName, sourceKey, destBucketName, destKey);
client.ResumableCopyObject(request, checkpointDir);
Console.WriteLine("Resumable copy new object:{0} succeeded", request.DestinationKey);
DownloadObjectRequest request = new DownloadObjectRequest(bucketName, key, fileToDownload)
{
PartSize = 8 * 1024 * 1024,
ParallelThreadCount = 3,
CheckpointDir = Config.DirToDownload,
};
client.ResumableDownloadObject(request);
Console.WriteLine("Resumable download object:{0} succeeded", key);
}
catch (OssException ex)
{
Expand All @@ -67,21 +78,15 @@ public static void ResumableCopyObject(string sourceBucketName, string sourceKey
}
}

public static void ResumableDownloadObject(string bucketName)
public static void ResumableCopyObject(string sourceBucketName, string sourceKey,
string destBucketName, string destKey)
{
const string key = "ResumableDownloadObject";
string fileToDownload = Config.DirToDownload + key;
string checkpointDir = Config.DirToDownload;
try
{
DownloadObjectRequest request = new DownloadObjectRequest(bucketName, key, fileToDownload)
{
PartSize = 8 * 1024 * 1024,
ParallelThreadCount = 3,
CheckpointDir = Config.DirToDownload,
};
client.ResumableDownloadObject(request);
Console.WriteLine("Resumable download object:{0} succeeded", key);
var request = new CopyObjectRequest(sourceBucketName, sourceKey, destBucketName, destKey);
client.ResumableCopyObject(request, checkpointDir);
Console.WriteLine("Resumable copy new object:{0} succeeded", request.DestinationKey);
}
catch (OssException ex)
{
Expand Down
37 changes: 0 additions & 37 deletions sdk/Common/ClientConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class ClientConfiguration
private long _progressUpdateInterval = 1024 * 4;
private long _directWriteStreamThreshold = 0;
private long _maxPartCachingSize = 1024 * 1024 * 100;
private int _maxResumableUploadThreads = 8;
private int _maxResumableDownloadThreads = 8;
private int _preReadBufferCount = 8;
private bool _useSingleThreadReadInResumableUpload = false;

Expand Down Expand Up @@ -197,25 +195,6 @@ public long MaxPartCachingSize
}
}

/// <summary>
/// Gets or sets the max uploading threads per resumable upload call
/// In multipart upload (resumable upload), by default it's multithreaded upload. You can specify the max thread count used per call.
/// If the number is no more than 1, then use single thread.
/// </summary>
/// <value>The size of the max part caching.</value>
public int MaxResumableUploadThreads
{
get
{
return _maxResumableUploadThreads;
}
set
{
_maxResumableUploadThreads = value;

}
}

/// <summary>
/// Gets or sets the pre read buffer count in resumable upload.
/// The max value could be the same size of MaxResumableUploadThreads;
Expand Down Expand Up @@ -250,22 +229,6 @@ public bool UseSingleThreadReadInResumableUpload
}
}

/// <summary>
/// Gets or sets the max resumable download threads.
/// </summary>
/// <value>The max resumable download threads.</value>
public int MaxResumableDownloadThreads
{
get
{
return _maxResumableDownloadThreads;
}
set
{
_maxResumableDownloadThreads = value;
}
}

/// <summary>
/// Gets the default user agent
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions sdk/Common/Communication/ServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public ServiceResponse EndSend(IAsyncResult aysncResult)
{
var ar = aysncResult as AsyncResult<ServiceResponse>;
Debug.Assert(ar != null);
if (ar == null)
{
throw new ArgumentException("ar must be type of AsyncResult<ServiceResponse>");
}

try
{
Expand Down
116 changes: 81 additions & 35 deletions sdk/Common/Communication/ServiceClientImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected override ServiceResponse SendCore(ServiceRequest serviceRequest,
ExecutionContext context)
{
var request = HttpFactory.CreateWebRequest(serviceRequest, Configuration);
SetRequestContent(request, serviceRequest, false, null, Configuration);
SetRequestContent(request, serviceRequest, Configuration);
try
{
var response = request.GetResponse() as HttpWebResponse;
Expand All @@ -194,8 +194,9 @@ protected override IAsyncResult BeginSendCore(ServiceRequest serviceRequest,
Request = serviceRequest
};

SetRequestContent(request, serviceRequest, true,
() => request.BeginGetResponse(OnGetResponseCompleted, asyncResult), Configuration);
BeginSetRequestContent(request, serviceRequest,
() => request.BeginGetResponse(OnGetResponseCompleted, asyncResult), Configuration,
asyncResult);

return asyncResult;
}
Expand Down Expand Up @@ -236,19 +237,16 @@ private void OnGetResponseCompleted(IAsyncResult ar)
}
}

private static void SetRequestContent(HttpWebRequest webRequest, ServiceRequest serviceRequest,
bool async, OssAction asyncCallback, ClientConfiguration clientConfiguration)
private static void SetRequestContent(HttpWebRequest webRequest,
ServiceRequest serviceRequest,
ClientConfiguration clientConfiguration)
{
var data = serviceRequest.BuildRequestContent();

if (data == null ||
(serviceRequest.Method != HttpMethod.Put &&
serviceRequest.Method != HttpMethod.Post))
{
// Skip setting content body in this case.
if (async)
asyncCallback();

return;
}

Expand All @@ -273,39 +271,87 @@ private static void SetRequestContent(HttpWebRequest webRequest, ServiceRequest
}
}

if (async)
using (var requestStream = webRequest.GetRequestStream())
{
webRequest.BeginGetRequestStream(
(ar) =>
{
using (var requestStream = webRequest.EndGetRequestStream(ar))
{
if (!webRequest.SendChunked)
{
IoUtils.WriteTo(data, requestStream, webRequest.ContentLength);
}
else
{
IoUtils.WriteTo(data, requestStream);
}
}
asyncCallback();
}, null);
if (!webRequest.SendChunked)
{
IoUtils.WriteTo(data, requestStream, webRequest.ContentLength);
}
else
{
IoUtils.WriteTo(data, requestStream);
}
}
}

private static void BeginSetRequestContent(HttpWebRequest webRequest, ServiceRequest serviceRequest,
OssAction asyncCallback, ClientConfiguration clientConfiguration, HttpAsyncResult result)
{
var data = serviceRequest.BuildRequestContent();

if (data == null ||
(serviceRequest.Method != HttpMethod.Put &&
serviceRequest.Method != HttpMethod.Post))
{
// Skip setting content body in this case.
try
{
asyncCallback();
}
catch(Exception e)
{
result.WebRequest.Abort();
result.Complete(e);
}

return;
}

// Write data to the request stream.
long userSetContentLength = -1;
if (serviceRequest.Headers.ContainsKey(HttpHeaders.ContentLength))
userSetContentLength = long.Parse(serviceRequest.Headers[HttpHeaders.ContentLength]);

if (serviceRequest.UseChunkedEncoding || !data.CanSeek) // when data cannot seek, we have to use chunked encoding as there's no way to set the length
{
webRequest.SendChunked = true;
webRequest.AllowWriteStreamBuffering = false; // when using chunked encoding, the data is likely big and thus not use write buffer;
}
else
{
using (var requestStream = webRequest.GetRequestStream())
long streamLength = data.Length - data.Position;
webRequest.ContentLength = (userSetContentLength >= 0 &&
userSetContentLength <= streamLength) ? userSetContentLength : streamLength;
if (webRequest.ContentLength > clientConfiguration.DirectWriteStreamThreshold)
{
if (!webRequest.SendChunked)
{
IoUtils.WriteTo(data, requestStream, webRequest.ContentLength);
}
else
{
IoUtils.WriteTo(data, requestStream);
}
webRequest.AllowWriteStreamBuffering = false;
}
}

webRequest.BeginGetRequestStream(
(ar) =>
{
try
{
using (var requestStream = webRequest.EndGetRequestStream(ar))
{
if (!webRequest.SendChunked)
{
IoUtils.WriteTo(data, requestStream, webRequest.ContentLength);
}
else
{
IoUtils.WriteTo(data, requestStream);
}
}
asyncCallback();
}
catch(Exception e)
{
result.WebRequest.Abort();
result.Complete(e);
}
}, null);
}

private static ServiceResponse HandleException(WebException ex)
Expand Down
4 changes: 2 additions & 2 deletions sdk/Common/ResumableDownloadManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void DoResumableDownload(DownloadObjectRequest request, ResumableDownloa
{
// use single thread if MaxResumableUploadThreads is no bigger than 1
// or the part size is bigger than the conf.MaxPartCachingSize
if (resumableContext.PartContextList[0].Length > _conf.MaxPartCachingSize || _conf.MaxResumableDownloadThreads <= 1)
if (resumableContext.PartContextList[0].Length > _conf.MaxPartCachingSize || request.ParallelThreadCount <= 1)
{
DoResumableDownloadSingleThread(request, resumableContext, downloadProgressCallback);
}
Expand Down Expand Up @@ -185,7 +185,7 @@ private void DoResumableDownloadMultiThread(DownloadObjectRequest request, Resum
}

Exception e = null;
int parallel = Math.Min(Math.Min(_conf.MaxResumableDownloadThreads, resumableContext.PartContextList.Count), Environment.ProcessorCount);
int parallel = Math.Min(Math.Min(request.ParallelThreadCount, resumableContext.PartContextList.Count), Environment.ProcessorCount);
ManualResetEvent[] taskFinishedEvents = new ManualResetEvent[parallel];
DownloadTaskParam[] taskParams = new DownloadTaskParam[parallel];
int nextPart = 0;
Expand Down
Loading

0 comments on commit 99a4dc0

Please sign in to comment.