Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output file upload sample #272

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 73 additions & 15 deletions Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,14 @@ private static CloudPool createPoolIfNotExists(BatchClient client, String poolId
* storage account name
* @param storageAccountKey
* storage account key
* @param containerName
* container name
* @return CloudBlobContainer instance
* @throws URISyntaxException
* @throws StorageException
*/
private static CloudBlobContainer createBlobContainer(String storageAccountName, String storageAccountKey)
private static CloudBlobContainer createBlobContainer(String storageAccountName, String storageAccountKey, String containerName)
throws URISyntaxException, StorageException {
String CONTAINER_NAME = "poolsandresourcefiles";

// Create storage credential from name and key
StorageCredentials credentials = new StorageCredentialsAccountAndKey(storageAccountName, storageAccountKey);
Expand All @@ -144,7 +145,26 @@ private static CloudBlobContainer createBlobContainer(String storageAccountName,

// Get a reference to a container.
// The container name must be lower case
return blobClient.getContainerReference(CONTAINER_NAME);
return blobClient.getContainerReference(containerName);
}

/**
therahulkumar marked this conversation as resolved.
Show resolved Hide resolved
* @param sharedAccessBlobPermissions
* @param expiry
* expiry in number of days
* @return
* shared access blob policy with permission and expiry
*/
private static SharedAccessBlobPolicy getSharedAccessBlobPolicy(SharedAccessBlobPermissions sharedAccessBlobPermissions, int expiry) {
SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy();
EnumSet<SharedAccessBlobPermissions> perEnumSet = EnumSet.of(sharedAccessBlobPermissions);
policy.setPermissions(perEnumSet);

Calendar c = Calendar.getInstance();
c.setTime(new Date());
c.add(Calendar.DATE, expiry);
policy.setSharedAccessExpiryTime(c.getTime());
return policy;
}

/**
Expand Down Expand Up @@ -173,27 +193,53 @@ private static String uploadFileToCloud(CloudBlobContainer container, String fil
blob.upload(new FileInputStream(source), source.length());

// Create policy with 1 day read permission
SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy();
EnumSet<SharedAccessBlobPermissions> perEnumSet = EnumSet.of(SharedAccessBlobPermissions.READ);
policy.setPermissions(perEnumSet);

Calendar c = Calendar.getInstance();
c.setTime(new Date());
c.add(Calendar.DATE, 1);
policy.setSharedAccessExpiryTime(c.getTime());
SharedAccessBlobPolicy policy = getSharedAccessBlobPolicy(SharedAccessBlobPermissions.READ, 1);

// Create SAS key
String sas = blob.generateSharedAccessSignature(policy, null);
return blob.getUri() + "?" + sas;
}

/**
*
* @param container
* container to upload the file
* @param filePattern
* all the files matching the given pattern will be upload to container
* @return list of output files
* @throws StorageException
* @throws InvalidKeyException
*/
private static List<OutputFile> getOutputFiles(CloudBlobContainer container, String filePattern) throws StorageException, InvalidKeyException {
therahulkumar marked this conversation as resolved.
Show resolved Hide resolved
// Create policy with 1 day write permission
SharedAccessBlobPolicy policy = getSharedAccessBlobPolicy(SharedAccessBlobPermissions.WRITE, 1);
String sasToken = container.generateSharedAccessSignature(policy, null);
String containerSasUrl = String.format("%s?%s", container.getUri().toString(), sasToken);
OutputFileBlobContainerDestination outputFileBlobContainerDestination =
new OutputFileBlobContainerDestination()
.withContainerUrl(containerSasUrl);

List<OutputFile> outputFiles = new ArrayList<>();
OutputFile outputFile = new OutputFile()
.withFilePattern(filePattern)
.withDestination(new OutputFileDestination()
.withContainer(outputFileBlobContainerDestination))
.withUploadOptions(new OutputFileUploadOptions()
.withUploadCondition(OutputFileUploadCondition.TASK_COMPLETION));

outputFiles.add(outputFile);
return outputFiles;
}

/**
* Create a job with a single task
*
* @param client
* batch client instance
* @param container
therahulkumar marked this conversation as resolved.
Show resolved Hide resolved
* blob container to upload the resource file
* @param outputFileUploadContainer
* blob container to upload the output files generated during task execution
* @param poolId
* pool id
* @param jobId
Expand All @@ -204,7 +250,7 @@ private static String uploadFileToCloud(CloudBlobContainer container, String fil
* @throws InvalidKeyException
* @throws URISyntaxException
*/
private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer container, String poolId,
private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer container, CloudBlobContainer outputFileUploadContainer, String poolId,
String jobId)
throws BatchErrorException, IOException, StorageException, InvalidKeyException, URISyntaxException {
String BLOB_FILE_NAME = "test.txt";
Expand All @@ -217,7 +263,9 @@ private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer c

// Create task
TaskAddParameter taskToAdd = new TaskAddParameter();
taskToAdd.withId("mytask").withCommandLine(String.format("cat %s", BLOB_FILE_NAME));
// Create files named test2.txt and test3.csv. later we can transfer these files from vm to storage container using a file pattern
String command = String.format("cat %s && cp %s test2.txt && cp %s test3.csv", BLOB_FILE_NAME, BLOB_FILE_NAME, BLOB_FILE_NAME);
taskToAdd.withId("mytask").withCommandLine(String.format("/bin/bash -c \"%s\"", command));

String sas = uploadFileToCloud(container, BLOB_FILE_NAME, LOCAL_FILE_PATH);

Expand All @@ -228,6 +276,9 @@ private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer c
files.add(file);
taskToAdd.withResourceFiles(files);

//
taskToAdd.withOutputFiles(getOutputFiles(outputFileUploadContainer, "*.txt"));

// Add task to job
client.taskOperations().createTask(jobId, taskToAdd);
}
Expand Down Expand Up @@ -319,7 +370,9 @@ public static void main(String argv[]) throws Exception {
BatchClient client = BatchClient.open(cred);

// Create storage container
CloudBlobContainer container = createBlobContainer(storageAccountName, storageAccountKey);
CloudBlobContainer container = createBlobContainer(storageAccountName, storageAccountKey, "poolsandresourcefiles");
CloudBlobContainer taskOutputFileUploadContainer = createBlobContainer(storageAccountName, storageAccountKey, "taskoutputfiles");
taskOutputFileUploadContainer.createIfNotExists();

String userName = System.getProperty("user.name");
String poolId = userName + "-pooltest";
Expand All @@ -328,8 +381,12 @@ public static void main(String argv[]) throws Exception {

try {
CloudPool sharedPool = createPoolIfNotExists(client, poolId);
submitJobAndAddTask(client, container, sharedPool.id(), jobId);
submitJobAndAddTask(client, container, taskOutputFileUploadContainer, sharedPool.id(), jobId);
if (waitForTasksToComplete(client, jobId, TASK_COMPLETE_TIMEOUT)) {
// Check of the task output files uploaded to container
for (ListBlobItem listBlobItem : taskOutputFileUploadContainer.listBlobs()) {
System.out.println("Uri of uploaded files after task completion: " + listBlobItem.getUri().toString());
}
// Get the task command output file
CloudTask task = client.taskOperations().getTask(jobId, "mytask");

Expand Down Expand Up @@ -364,6 +421,7 @@ public static void main(String argv[]) throws Exception {

if (shouldDeleteContainer) {
container.deleteIfExists();
taskOutputFileUploadContainer.deleteIfExists();
}
}
}
Expand Down