Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not apply MTA archive size limit for transports with multiple MTAs #1551

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ public class Messages {
public static final String ERROR_WAITING_FOR_OPERATION_TO_FINISH = "Error waiting for operation to finish";
public static final String OPERATION_OF_SERVICE_BINDING_OR_KEY_IS_IN_PROGRESS = "Operation of service binding or key is in progress";
public static final String ARCHIVE_WAS_NOT_SPLIT_TOTAL_SIZE_IN_BYTES_0 = "Archive was not split! Total size in bytes: {0}";
public static final String SIZE_OF_MTAR_IS_AND_SIZE_OF_EXTENSION_DESCRIPTOR_ID = "Size of mtars is {0} and size of extension descriptors is {1}";
public static final String ARCHIVE_IS_SPLIT_TO_0_PARTS_TOTAL_SIZE_IN_BYTES_1_UPLOADING = "Archive was split to: {0} parts. Total size in bytes: {1}. Uploading started...";
public static final String SIZE_OF_APP_0_IS_1_BYTES = "Size of app {0} is {1} bytes";
public static final String SHOULD_UPDATE_SERVICE_KEY = "Service keys should be updated";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package org.cloudfoundry.multiapps.controller.process.steps;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.math.BigInteger;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

import org.apache.commons.io.IOUtils;
import org.cloudfoundry.multiapps.common.ContentException;
import org.cloudfoundry.multiapps.common.SLException;
Expand All @@ -19,12 +25,8 @@
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Scope;

import java.math.BigInteger;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import jakarta.inject.Inject;
import jakarta.inject.Named;

@Named("validateDeployParametersStep")
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
Expand Down Expand Up @@ -57,22 +59,34 @@ protected String getStepErrorMessage(ProcessContext context) {
}

private void validateParameters(ProcessContext context) {
validateExtensionDescriptorFileIds(context);
validateFilesSizeLimit(context);
validateArchive(context);
List<FileEntry> extensionDescriptors = validateExtensionDescriptorFileIds(context);
List<FileEntry> archivePartEntries = getArchivePartEntries(context);
validateFilesSizeLimit(context, archivePartEntries, extensionDescriptors);

if (archivePartEntries.size() == 1) {
getStepLogger().infoWithoutProgressMessage(Messages.ARCHIVE_WAS_NOT_SPLIT_TOTAL_SIZE_IN_BYTES_0, archivePartEntries.get(0)
.getSize());
} else {
mergeArchive(context, archivePartEntries);
}
}

private void validateExtensionDescriptorFileIds(ProcessContext context) {
private List<FileEntry> validateExtensionDescriptorFileIds(ProcessContext context) {
List<FileEntry> extensionDescriptors = new ArrayList<>();
String extensionDescriptorFileId = context.getVariable(Variables.EXT_DESCRIPTOR_FILE_ID);

if (extensionDescriptorFileId == null) {
return;
return List.of();
}

String[] extensionDescriptorFileIds = extensionDescriptorFileId.split(",");
for (String fileId : extensionDescriptorFileIds) {
FileEntry file = findFile(context, fileId);
validateDescriptorSize(file);
extensionDescriptors.add(file);
Yavor16 marked this conversation as resolved.
Show resolved Hide resolved
}

return extensionDescriptors;
}

private FileEntry findFile(ProcessContext context, String fileId) {
Expand All @@ -94,27 +108,37 @@ private void validateDescriptorSize(FileEntry file) {
.compareTo(BigInteger.valueOf(maxSizeLimit)) > 0) {
throw new SLException(org.cloudfoundry.multiapps.mta.Messages.ERROR_SIZE_OF_FILE_EXCEEDS_CONFIGURED_MAX_SIZE_LIMIT,
file.getSize()
.toString(), file.getName(), String.valueOf(maxSizeLimit.longValue()));
.toString(),
file.getName(),
String.valueOf(maxSizeLimit.longValue()));
}
}

private void validateFilesSizeLimit(ProcessContext context) {
private void validateFilesSizeLimit(ProcessContext context, List<FileEntry> archivePartEntries, List<FileEntry> extensionDescriptors) {
try {
checkFileSizeOfAllFiles(context);
checkFileSizeOfAllFiles(context, archivePartEntries, extensionDescriptors);
} catch (FileStorageException e) {
throw new SLException(e, MessageFormat.format(Messages.ERROR_OCURRED_DURING_VALIDATION_OF_FILES_0, e.getMessage()));
}

}

private void checkFileSizeOfAllFiles(ProcessContext context) throws FileStorageException {
private void checkFileSizeOfAllFiles(ProcessContext context, List<FileEntry> archivePartEntries, List<FileEntry> extensionDescriptors)
throws FileStorageException {
long maxFileSizeLimit = configuration.getMaxUploadSize();
List<FileEntry> fileEntries = fileService.listFilesBySpaceAndOperationId(context.getVariable(Variables.SPACE_GUID),
context.getVariable(Variables.CORRELATION_ID));
long sizeOfAllFiles = getSizeOfAllFiles(fileEntries);
if (sizeOfAllFiles >= maxFileSizeLimit) {
deleteFiles(context, fileEntries);
throw new ContentException(Messages.SIZE_OF_ALL_OPERATIONS_FILES_0_EXCEEDS_MAX_UPLOAD_SIZE_1, sizeOfAllFiles, maxFileSizeLimit);
long sizeOfAllArchivePartEntries = getSizeOfAllFiles(archivePartEntries);
long sizeOfExtensionDescriptorsEntries = getSizeOfAllFiles(extensionDescriptors);
Yavor16 marked this conversation as resolved.
Show resolved Hide resolved
long sizeOfAllFileEntries = sizeOfAllArchivePartEntries + sizeOfExtensionDescriptorsEntries;

getStepLogger().infoWithoutProgressMessage(Messages.SIZE_OF_MTAR_IS_AND_SIZE_OF_EXTENSION_DESCRIPTOR_ID,
sizeOfAllArchivePartEntries, sizeOfExtensionDescriptorsEntries);

if (sizeOfAllFileEntries > maxFileSizeLimit) {
deleteFiles(context, archivePartEntries);
Yavor16 marked this conversation as resolved.
Show resolved Hide resolved
deleteFiles(context, extensionDescriptors);
throw new ContentException(Messages.SIZE_OF_ALL_OPERATIONS_FILES_0_EXCEEDS_MAX_UPLOAD_SIZE_1,
sizeOfAllFileEntries,
maxFileSizeLimit);
}
}

Expand All @@ -126,21 +150,26 @@ private long getSizeOfAllFiles(List<FileEntry> fileEntries) {
}

private void deleteFiles(ProcessContext context, List<FileEntry> fileEntries) throws FileStorageException {
FileSweeper fileSweeper = new FileSweeper(context.getVariable(Variables.SPACE_GUID), fileService,
FileSweeper fileSweeper = new FileSweeper(context.getVariable(Variables.SPACE_GUID),
fileService,
context.getVariable(Variables.CORRELATION_ID));
fileSweeper.sweep(fileEntries);
}

private void validateArchive(ProcessContext context) {
private List<FileEntry> getArchivePartEntries(ProcessContext context) {
Yavor16 marked this conversation as resolved.
Show resolved Hide resolved
String[] archivePartIds = getArchivePartIds(context);
if (archivePartIds.length == 1) {
// TODO The merging of chunks should be done prior to this step
FileEntry archiveFileEntry = findFile(context, archivePartIds[0]);
getStepLogger().infoWithoutProgressMessage(Messages.ARCHIVE_WAS_NOT_SPLIT_TOTAL_SIZE_IN_BYTES_0, archiveFileEntry.getSize());
return;
return List.of(archiveFileEntry);
}
List<FileEntry> archivePartEntries = getArchivePartEntries(context, archivePartIds);
context.setVariable(Variables.FILE_ENTRIES, archivePartEntries);

return archivePartEntries;
}

private void mergeArchive(ProcessContext context, List<FileEntry> archivePartEntries) {
BigInteger archiveSize = calculateArchiveSize(archivePartEntries);
resilientOperationExecutor.execute(() -> mergeArchive(context, archivePartEntries, archiveSize));
}
Expand All @@ -152,9 +181,9 @@ private void mergeArchive(ProcessContext context, List<FileEntry> archivePartEnt
archivePartEntries.size(), archiveSize);
FileEntry uploadedArchive = persistArchive(archiveStreamWithName, context, archiveSize);
context.setVariable(Variables.APP_ARCHIVE_ID, uploadedArchive.getId());
getStepLogger().infoWithoutProgressMessage(
MessageFormat.format(Messages.ARCHIVE_WITH_ID_0_AND_NAME_1_WAS_STORED, uploadedArchive.getId(),
archiveStreamWithName.getArchiveName()));
getStepLogger().infoWithoutProgressMessage(MessageFormat.format(Messages.ARCHIVE_WITH_ID_0_AND_NAME_1_WAS_STORED,
uploadedArchive.getId(),
archiveStreamWithName.getArchiveName()));
} finally {
IOUtils.closeQuietly(archiveStreamWithName.getArchiveStream());
}
Expand Down Expand Up @@ -183,8 +212,8 @@ private MergedArchiveStreamCreator getMergedArchiveStreamCreator(List<FileEntry>

private FileEntry persistArchive(ArchiveStreamWithName archiveStreamWithName, ProcessContext context, BigInteger size) {
try {
return fileStorageThreadPool.submit(
new PriorityCallable<>(PriorityFuture.Priority.HIGHEST, () -> doPersistArchive(archiveStreamWithName, context, size)))
return fileStorageThreadPool.submit(new PriorityCallable<>(PriorityFuture.Priority.HIGHEST,
() -> doPersistArchive(archiveStreamWithName, context, size)))
.get();
} catch (ExecutionException | InterruptedException e) {
throw new SLException(e.getMessage(), e);
Expand All @@ -200,7 +229,8 @@ private FileEntry doPersistArchive(ArchiveStreamWithName archiveStreamWithName,
.operationId(context.getExecution()
.getProcessInstanceId())
.size(size)
.build(), archiveStreamWithName.getArchiveStream());
.build(),
archiveStreamWithName.getArchiveStream());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ private void prepareFileService(String appArchiveId) throws FileStorageException
when(fileService.getFile("space-id", MERGED_ARCHIVE_NAME + ".part.2"))
.thenReturn(createFileEntry(MERGED_ARCHIVE_NAME + ".part.2", MERGED_ARCHIVE_NAME + ".part.2", 1024 * 1024L));

when(fileService.getFile("space-id", EXCEEDING_FILE_SIZE_ID + ".part.0"))
.thenReturn(createFileEntry(MERGED_ARCHIVE_NAME + ".part.0", MERGED_ARCHIVE_NAME + ".part.0", 1024 * 1024 * 1024));

when(fileService.getFile("space-id", EXCEEDING_FILE_SIZE_ID + ".part.1"))
.thenReturn(createFileEntry(MERGED_ARCHIVE_NAME + ".part.1", MERGED_ARCHIVE_NAME + ".part.1", 1024 * 1024 * 1024));

when(fileService.getFile("space-id", EXCEEDING_FILE_SIZE_ID + ".part.2"))
.thenReturn(createFileEntry(MERGED_ARCHIVE_NAME + ".part.2", MERGED_ARCHIVE_NAME + ".part.2", 1024 * 1024 * 1024));

when(fileService.getFile("space-id", EXCEEDING_FILE_SIZE_ID + ".part.3"))
.thenReturn(createFileEntry(MERGED_ARCHIVE_NAME + ".part.3", MERGED_ARCHIVE_NAME + ".part.3", 1024 * 1024 * 1024));

when(fileService.getFile("space-id", EXCEEDING_FILE_SIZE_ID + ".part.4"))
.thenReturn(createFileEntry(MERGED_ARCHIVE_NAME + ".part.4", MERGED_ARCHIVE_NAME + ".part.4", 1024 * 1024 * 1024));

when(fileService.getFile("space-id", EXISTING_BIGGER_FILE_ID))
.thenReturn(createFileEntry(EXISTING_BIGGER_FILE_ID, "extDescriptorFile", 1024 * 1024L + 1));
when(fileService.getFile("space-id", NOT_EXISTING_FILE_ID))
Expand Down
Loading