From 67f21ea66a2a3a183678da2a763ef95c22803952 Mon Sep 17 00:00:00 2001 From: therahulkumar Date: Sat, 22 Jun 2019 13:51:17 +0530 Subject: [PATCH 1/4] Added a sample to upload output files to container --- .../src/main/java/PoolAndResourceFile.java | 66 ++++++++++++++----- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java index 53483da1..76d14165 100644 --- a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java +++ b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java @@ -124,13 +124,14 @@ private static CloudPool createPoolIfNotExists(BatchClient client, String poolId * storage account name * @param storageAccountKey * storage account key + * @param containerName + * container name * @return CloudBlobContainer instance * @throws URISyntaxException * @throws StorageException */ - private static CloudBlobContainer createBlobContainer(String storageAccountName, String storageAccountKey) + private static CloudBlobContainer createBlobContainer(String storageAccountName, String storageAccountKey, String containerName) throws URISyntaxException, StorageException { - String CONTAINER_NAME = "poolsandresourcefiles"; // Create storage credential from name and key StorageCredentials credentials = new StorageCredentialsAccountAndKey(storageAccountName, storageAccountKey); @@ -144,7 +145,19 @@ private static CloudBlobContainer createBlobContainer(String storageAccountName, // Get a reference to a container. // The container name must be lower case - return blobClient.getContainerReference(CONTAINER_NAME); + return blobClient.getContainerReference(containerName); + } + + private static SharedAccessBlobPolicy getSharedAccessBlobPolicy(SharedAccessBlobPermissions sharedAccessBlobPermissions, int expiry) { + SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy(); + EnumSet perEnumSet = EnumSet.of(sharedAccessBlobPermissions); + policy.setPermissions(perEnumSet); + + Calendar c = Calendar.getInstance(); + c.setTime(new Date()); + c.add(Calendar.DATE, expiry); + policy.setSharedAccessExpiryTime(c.getTime()); + return policy; } /** @@ -173,20 +186,35 @@ private static String uploadFileToCloud(CloudBlobContainer container, String fil blob.upload(new FileInputStream(source), source.length()); // Create policy with 1 day read permission - SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy(); - EnumSet perEnumSet = EnumSet.of(SharedAccessBlobPermissions.READ); - policy.setPermissions(perEnumSet); - - Calendar c = Calendar.getInstance(); - c.setTime(new Date()); - c.add(Calendar.DATE, 1); - policy.setSharedAccessExpiryTime(c.getTime()); + SharedAccessBlobPolicy policy = getSharedAccessBlobPolicy(SharedAccessBlobPermissions.READ, 1); // Create SAS key String sas = blob.generateSharedAccessSignature(policy, null); return blob.getUri() + "?" + sas; } + + private static List getOutputFiles(CloudBlobContainer container, String filePattern) throws StorageException, InvalidKeyException { + // Create policy with 1 day write permission + SharedAccessBlobPolicy policy = getSharedAccessBlobPolicy(SharedAccessBlobPermissions.WRITE, 1); + String sasToken = container.generateSharedAccessSignature(policy, null); + String containerSasUrl = String.format("%s?%s", container.getUri().toString(), sasToken); + OutputFileBlobContainerDestination outputFileBlobContainerDestination = + new OutputFileBlobContainerDestination() + .withContainerUrl(containerSasUrl); + + List outputFiles = new ArrayList<>(); + OutputFile outputFile = new OutputFile() + .withFilePattern(filePattern) + .withDestination(new OutputFileDestination() + .withContainer(outputFileBlobContainerDestination)) + .withUploadOptions(new OutputFileUploadOptions() + .withUploadCondition(OutputFileUploadCondition.TASK_COMPLETION)); + + outputFiles.add(outputFile); + return outputFiles; + } + /** * Create a job with a single task * @@ -194,6 +222,8 @@ private static String uploadFileToCloud(CloudBlobContainer container, String fil * batch client instance * @param container * blob container to upload the resource file + * @param outputFileUploadContainer + * blob container to upload the output files generated during task execution * @param poolId * pool id * @param jobId @@ -204,7 +234,7 @@ private static String uploadFileToCloud(CloudBlobContainer container, String fil * @throws InvalidKeyException * @throws URISyntaxException */ - private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer container, String poolId, + private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer container, CloudBlobContainer outputFileUploadContainer, String poolId, String jobId) throws BatchErrorException, IOException, StorageException, InvalidKeyException, URISyntaxException { String BLOB_FILE_NAME = "test.txt"; @@ -217,7 +247,8 @@ private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer c // Create task TaskAddParameter taskToAdd = new TaskAddParameter(); - taskToAdd.withId("mytask").withCommandLine(String.format("cat %s", BLOB_FILE_NAME)); + // Create files named test2.txt and test3.csv. later we can transfer these files from vm to storage container using a file pattern + taskToAdd.withId("mytask").withCommandLine(String.format("cat %s && touch test2.txt && touch test3.csv", BLOB_FILE_NAME)); String sas = uploadFileToCloud(container, BLOB_FILE_NAME, LOCAL_FILE_PATH); @@ -228,6 +259,9 @@ private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer c files.add(file); taskToAdd.withResourceFiles(files); + // + taskToAdd.withOutputFiles(getOutputFiles(outputFileUploadContainer, "*.txt")); + // Add task to job client.taskOperations().createTask(jobId, taskToAdd); } @@ -319,7 +353,8 @@ public static void main(String argv[]) throws Exception { BatchClient client = BatchClient.open(cred); // Create storage container - CloudBlobContainer container = createBlobContainer(storageAccountName, storageAccountKey); + CloudBlobContainer container = createBlobContainer(storageAccountName, storageAccountKey, "poolsandresourcefiles"); + CloudBlobContainer taskOutputFileUploadContainer = createBlobContainer(storageAccountName, storageAccountKey, "taskoutputfiles"); String userName = System.getProperty("user.name"); String poolId = userName + "-pooltest"; @@ -328,7 +363,7 @@ public static void main(String argv[]) throws Exception { try { CloudPool sharedPool = createPoolIfNotExists(client, poolId); - submitJobAndAddTask(client, container, sharedPool.id(), jobId); + submitJobAndAddTask(client, container, taskOutputFileUploadContainer, sharedPool.id(), jobId); if (waitForTasksToComplete(client, jobId, TASK_COMPLETE_TIMEOUT)) { // Get the task command output file CloudTask task = client.taskOperations().getTask(jobId, "mytask"); @@ -364,6 +399,7 @@ public static void main(String argv[]) throws Exception { if (shouldDeleteContainer) { container.deleteIfExists(); + taskOutputFileUploadContainer.deleteIfExists(); } } } From 5e9d139ac6b6cc65197af851ec35939c14605e24 Mon Sep 17 00:00:00 2001 From: therahulkumar Date: Sat, 22 Jun 2019 13:58:06 +0530 Subject: [PATCH 2/4] Added a check to test the file is uploaded to container after task completion --- .../src/main/java/PoolAndResourceFile.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java index 76d14165..e2d30c0c 100644 --- a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java +++ b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java @@ -365,6 +365,10 @@ public static void main(String argv[]) throws Exception { CloudPool sharedPool = createPoolIfNotExists(client, poolId); submitJobAndAddTask(client, container, taskOutputFileUploadContainer, sharedPool.id(), jobId); if (waitForTasksToComplete(client, jobId, TASK_COMPLETE_TIMEOUT)) { + // Check of the task output files uploaded to container + for (ListBlobItem listBlobItem: taskOutputFileUploadContainer.listBlobs()) { + System.out.println("Uri of uploaded files after task completion: " + listBlobItem.getUri().toString()); + } // Get the task command output file CloudTask task = client.taskOperations().getTask(jobId, "mytask"); From 1b1d2b7234221c5b034ba1da6ef8444b693adbed Mon Sep 17 00:00:00 2001 From: therahulkumar Date: Sat, 22 Jun 2019 14:41:58 +0530 Subject: [PATCH 3/4] modified the command to be executed and added java doc string --- .../src/main/java/PoolAndResourceFile.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java index e2d30c0c..b4c6c244 100644 --- a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java +++ b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java @@ -148,6 +148,13 @@ private static CloudBlobContainer createBlobContainer(String storageAccountName, return blobClient.getContainerReference(containerName); } + /** + * @param sharedAccessBlobPermissions + * @param expiry + * expiry in number of days + * @return + * shared access blob policy with permission and expiry + */ private static SharedAccessBlobPolicy getSharedAccessBlobPolicy(SharedAccessBlobPermissions sharedAccessBlobPermissions, int expiry) { SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy(); EnumSet perEnumSet = EnumSet.of(sharedAccessBlobPermissions); @@ -193,7 +200,16 @@ private static String uploadFileToCloud(CloudBlobContainer container, String fil return blob.getUri() + "?" + sas; } - + /** + * + * @param container + * container to upload the file + * @param filePattern + * all the files matching the given pattern will be upload to container + * @return list of output files + * @throws StorageException + * @throws InvalidKeyException + */ private static List getOutputFiles(CloudBlobContainer container, String filePattern) throws StorageException, InvalidKeyException { // Create policy with 1 day write permission SharedAccessBlobPolicy policy = getSharedAccessBlobPolicy(SharedAccessBlobPermissions.WRITE, 1); @@ -248,7 +264,8 @@ private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer c // Create task TaskAddParameter taskToAdd = new TaskAddParameter(); // Create files named test2.txt and test3.csv. later we can transfer these files from vm to storage container using a file pattern - taskToAdd.withId("mytask").withCommandLine(String.format("cat %s && touch test2.txt && touch test3.csv", BLOB_FILE_NAME)); + String command = String.format("cat %s && cp %s test2.txt && cp %s test3.csv", BLOB_FILE_NAME, BLOB_FILE_NAME, BLOB_FILE_NAME); + taskToAdd.withId("mytask").withCommandLine(String.format("/bin/bash -c \"%s\"", command)); String sas = uploadFileToCloud(container, BLOB_FILE_NAME, LOCAL_FILE_PATH); @@ -355,6 +372,7 @@ public static void main(String argv[]) throws Exception { // Create storage container CloudBlobContainer container = createBlobContainer(storageAccountName, storageAccountKey, "poolsandresourcefiles"); CloudBlobContainer taskOutputFileUploadContainer = createBlobContainer(storageAccountName, storageAccountKey, "taskoutputfiles"); + taskOutputFileUploadContainer.createIfNotExists(); String userName = System.getProperty("user.name"); String poolId = userName + "-pooltest"; @@ -366,7 +384,7 @@ public static void main(String argv[]) throws Exception { submitJobAndAddTask(client, container, taskOutputFileUploadContainer, sharedPool.id(), jobId); if (waitForTasksToComplete(client, jobId, TASK_COMPLETE_TIMEOUT)) { // Check of the task output files uploaded to container - for (ListBlobItem listBlobItem: taskOutputFileUploadContainer.listBlobs()) { + for (ListBlobItem listBlobItem : taskOutputFileUploadContainer.listBlobs()) { System.out.println("Uri of uploaded files after task completion: " + listBlobItem.getUri().toString()); } // Get the task command output file From 87ae55ff66f0f3c83ae9ef1cabf493ebfe74243f Mon Sep 17 00:00:00 2001 From: therahulkumar Date: Mon, 24 Jun 2019 21:48:27 +0530 Subject: [PATCH 4/4] incorporating reviews --- .../src/main/java/PoolAndResourceFile.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java index b4c6c244..7e6f890e 100644 --- a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java +++ b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java @@ -149,6 +149,8 @@ private static CloudBlobContainer createBlobContainer(String storageAccountName, } /** + * Get shared access blob policy with access permissions and expiry to be used to create sas token + * * @param sharedAccessBlobPermissions * @param expiry * expiry in number of days @@ -201,6 +203,7 @@ private static String uploadFileToCloud(CloudBlobContainer container, String fil } /** + * Creates a list of output files matching a file pattern, these files will be uploaded to azure container after task completion * * @param container * container to upload the file @@ -210,7 +213,7 @@ private static String uploadFileToCloud(CloudBlobContainer container, String fil * @throws StorageException * @throws InvalidKeyException */ - private static List getOutputFiles(CloudBlobContainer container, String filePattern) throws StorageException, InvalidKeyException { + private static List createOutputFiles(CloudBlobContainer container, String filePattern) throws StorageException, InvalidKeyException { // Create policy with 1 day write permission SharedAccessBlobPolicy policy = getSharedAccessBlobPolicy(SharedAccessBlobPermissions.WRITE, 1); String sasToken = container.generateSharedAccessSignature(policy, null); @@ -236,7 +239,7 @@ private static List getOutputFiles(CloudBlobContainer container, Str * * @param client * batch client instance - * @param container + * @param resourceFileUploadContainer * blob container to upload the resource file * @param outputFileUploadContainer * blob container to upload the output files generated during task execution @@ -250,7 +253,7 @@ private static List getOutputFiles(CloudBlobContainer container, Str * @throws InvalidKeyException * @throws URISyntaxException */ - private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer container, CloudBlobContainer outputFileUploadContainer, String poolId, + private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer resourceFileUploadContainer, CloudBlobContainer outputFileUploadContainer, String poolId, String jobId) throws BatchErrorException, IOException, StorageException, InvalidKeyException, URISyntaxException { String BLOB_FILE_NAME = "test.txt"; @@ -267,7 +270,7 @@ private static void submitJobAndAddTask(BatchClient client, CloudBlobContainer c String command = String.format("cat %s && cp %s test2.txt && cp %s test3.csv", BLOB_FILE_NAME, BLOB_FILE_NAME, BLOB_FILE_NAME); taskToAdd.withId("mytask").withCommandLine(String.format("/bin/bash -c \"%s\"", command)); - String sas = uploadFileToCloud(container, BLOB_FILE_NAME, LOCAL_FILE_PATH); + String sas = uploadFileToCloud(resourceFileUploadContainer, BLOB_FILE_NAME, LOCAL_FILE_PATH); // Associate resource file with task ResourceFile file = new ResourceFile(); @@ -370,7 +373,7 @@ public static void main(String argv[]) throws Exception { BatchClient client = BatchClient.open(cred); // Create storage container - CloudBlobContainer container = createBlobContainer(storageAccountName, storageAccountKey, "poolsandresourcefiles"); + CloudBlobContainer resourceFileUploadContainer = createBlobContainer(storageAccountName, storageAccountKey, "poolsandresourcefiles"); CloudBlobContainer taskOutputFileUploadContainer = createBlobContainer(storageAccountName, storageAccountKey, "taskoutputfiles"); taskOutputFileUploadContainer.createIfNotExists(); @@ -381,7 +384,7 @@ public static void main(String argv[]) throws Exception { try { CloudPool sharedPool = createPoolIfNotExists(client, poolId); - submitJobAndAddTask(client, container, taskOutputFileUploadContainer, sharedPool.id(), jobId); + submitJobAndAddTask(client, resourceFileUploadContainer, taskOutputFileUploadContainer, sharedPool.id(), jobId); if (waitForTasksToComplete(client, jobId, TASK_COMPLETE_TIMEOUT)) { // Check of the task output files uploaded to container for (ListBlobItem listBlobItem : taskOutputFileUploadContainer.listBlobs()) { @@ -420,7 +423,7 @@ public static void main(String argv[]) throws Exception { } if (shouldDeleteContainer) { - container.deleteIfExists(); + resourceFileUploadContainer.deleteIfExists(); taskOutputFileUploadContainer.deleteIfExists(); } }