Skip to content

Commit 84f38cc

Browse files
Store async upload job in the request's thread
1 parent dce2536 commit 84f38cc

File tree

3 files changed

+75
-43
lines changed

3 files changed

+75
-43
lines changed

multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public final class Messages {
2020
public static final String CANNOT_PARSE_CONTAINER_URI_OF_OBJECT_STORE = "Cannot parse container_uri of object store";
2121
public static final String UNSUPPORTED_SERVICE_PLAN_FOR_OBJECT_STORE = "Unsupported service plan for object store!";
2222
public static final String REQUEST_0_1_FAILED_WITH_2 = "Request \"{0} {1}\" failed with \"{2}\"";
23+
public static final String ERROR_OCCURRED_WHILE_DELETING_JOB_ENTRY = "Error occurred while deleting job entry";
2324

2425
// Audit log messages
2526

@@ -63,7 +64,7 @@ public final class Messages {
6364
public static final String CREATING_ASYNC_UPLOAD_JOB = "Creating async upload job for URL {} with ID: {}";
6465
public static final String ASYNC_UPLOAD_JOB_REJECTED = "Async upload job {} rejected. Deleting entry";
6566
public static final String STARTING_DOWNLOAD_OF_MTAR = "Starting download of MTAR from remote endpoint: {}";
66-
public static final String UPLOADED_MTAR_FROM_REMOTE_ENDPOINT = "Uploaded MTAR from remote endpoint {} in {} ms";
67+
public static final String UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID = "Uploaded MTAR from remote endpoint {} with job id: {} in {} ms";
6768
public static final String ASYNC_UPLOAD_JOB_FINISHED = "Async upload job {} finished";
6869
public static final String UPLOADING_MTAR_STREAM_FROM_REMOTE_ENDPOINT = "Uploading MTAR stream from remote endpoint: {}";
6970
public static final String CALLING_REMOTE_MTAR_ENDPOINT = "Calling remote MTAR endpoint {}";

multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,14 @@ public ResponseEntity<FileMetadata> uploadFile(MultipartHttpServletRequest reque
113113
var multipartFile = getFileFromRequest(request);
114114
try (InputStream in = new BufferedInputStream(multipartFile.getInputStream(), INPUT_STREAM_BUFFER_SIZE)) {
115115
var startTime = LocalDateTime.now();
116-
FileEntry fileEntry = fileService.addFile(spaceGuid, namespace, multipartFile.getOriginalFilename(), in, multipartFile.getSize());
116+
FileEntry fileEntry = fileService.addFile(spaceGuid, namespace, multipartFile.getOriginalFilename(), in,
117+
multipartFile.getSize());
117118
FileMetadata file = parseFileEntry(fileEntry);
118119
AuditLoggingProvider.getFacade()
119120
.logConfigCreate(file);
120121
var endTime = LocalDateTime.now();
121-
LOGGER.trace(Messages.UPLOADED_FILE, file.getId(), file.getName(), file.getSize(), file.getDigest(),
122-
file.getDigestAlgorithm(), ChronoUnit.MILLIS.between(startTime, endTime));
122+
LOGGER.trace(Messages.UPLOADED_FILE, file.getId(), file.getName(), file.getSize(), file.getDigest(), file.getDigestAlgorithm(),
123+
ChronoUnit.MILLIS.between(startTime, endTime));
123124
return ResponseEntity.status(HttpStatus.CREATED)
124125
.body(file);
125126
} catch (Exception e) {
@@ -145,19 +146,32 @@ public ResponseEntity<Void> startUploadFromUrl(String spaceGuid, String namespac
145146
var entry = createJobEntry(spaceGuid, namespace, urlWithoutUserInfo);
146147
LOGGER.debug(Messages.CREATING_ASYNC_UPLOAD_JOB, urlWithoutUserInfo, entry.getId());
147148
try {
149+
uploadJobService.add(entry);
148150
deployFromUrlExecutor.execute(() -> uploadFileFromUrl(entry, spaceGuid, namespace, decodedUrl));
149151
} catch (RejectedExecutionException ignored) {
150152
LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_REJECTED, entry.getId());
153+
deleteAsyncJobEntry(entry);
151154
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
152155
.header(HttpHeaders.RETRY_AFTER, RETRY_AFTER_SECONDS)
153156
.build();
154157
}
155158
return ResponseEntity.accepted()
156-
.header("x-cf-app-instance", configuration.getApplicationGuid() + ":" + configuration.getApplicationInstanceIndex())
159+
.header("x-cf-app-instance",
160+
configuration.getApplicationGuid() + ":" + configuration.getApplicationInstanceIndex())
157161
.header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, entry.getId()))
158162
.build();
159163
}
160164

165+
private void deleteAsyncJobEntry(AsyncUploadJobEntry entry) {
166+
try {
167+
uploadJobService.createQuery()
168+
.id(entry.getId())
169+
.delete();
170+
} catch (Exception e) {
171+
LOGGER.error(Messages.ERROR_OCCURRED_WHILE_DELETING_JOB_ENTRY, e);
172+
}
173+
}
174+
161175
private String getLocationHeader(String spaceGuid, String jobId) {
162176
return "spaces/" + spaceGuid + "/files/jobs/" + jobId;
163177
}
@@ -259,8 +273,8 @@ private AsyncUploadJobEntry getJob(String id, String spaceGuid, String namespace
259273
try {
260274
return uploadJobService.createQuery()
261275
.id(id)
262-
//even though the ID fully qualifies the job, we add these filters
263-
//to prevent accessing a job from another space, namespace or a different user
276+
// even though the ID fully qualifies the job, we add these filters
277+
// to prevent accessing a job from another space, namespace or a different user
264278
.spaceGuid(spaceGuid)
265279
.user(SecurityContextUtil.getUsername())
266280
.namespace(namespace)
@@ -280,29 +294,32 @@ private AsyncUploadResult createErrorResult(String error) {
280294
private void uploadFileFromUrl(AsyncUploadJobEntry jobEntry, String spaceGuid, String namespace, String fileUrl) {
281295
var counter = new AtomicLong(0);
282296
jobCounters.put(jobEntry.getId(), counter);
297+
LOGGER.debug(Messages.STARTING_DOWNLOAD_OF_MTAR, jobEntry.getUrl());
298+
var startTime = LocalDateTime.now();
299+
AsyncUploadJobEntry jobEntryWithTimestamp = ImmutableAsyncUploadJobEntry.copyOf(jobEntry)
300+
.withState(State.RUNNING)
301+
.withStartedAt(startTime);
283302
try {
284-
uploadJobService.add(jobEntry);
285-
LOGGER.debug(Messages.STARTING_DOWNLOAD_OF_MTAR, jobEntry.getUrl());
286-
var startTime = LocalDateTime.now();
287-
uploadJobService.update(jobEntry, ImmutableAsyncUploadJobEntry.copyOf(jobEntry)
288-
.withState(State.RUNNING)
289-
.withStartedAt(startTime));
290-
FileEntry fileEntry = resilientOperationExecutor.execute((CheckedSupplier<FileEntry>) () -> doUploadFileFromUrl(spaceGuid, namespace, fileUrl, counter));
291-
LOGGER.trace(Messages.UPLOADED_MTAR_FROM_REMOTE_ENDPOINT, jobEntry.getUrl(),
303+
jobEntryWithTimestamp = uploadJobService.update(jobEntry, jobEntryWithTimestamp);
304+
FileEntry fileEntry = resilientOperationExecutor.execute((CheckedSupplier<FileEntry>) () -> doUploadFileFromUrl(spaceGuid,
305+
namespace,
306+
fileUrl,
307+
counter));
308+
LOGGER.trace(Messages.UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID, jobEntry.getUrl(), jobEntry.getId(),
292309
ChronoUnit.MILLIS.between(startTime, LocalDateTime.now()));
293310

294311
var descriptor = fileService.processFileContent(spaceGuid, fileEntry.getId(), this::extractDeploymentDescriptor);
295312
LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_FINISHED, jobEntry.getId());
296-
uploadJobService.update(jobEntry, ImmutableAsyncUploadJobEntry.copyOf(jobEntry)
297-
.withFileId(fileEntry.getId())
298-
.withMtaId(descriptor.getId())
299-
.withFinishedAt(LocalDateTime.now())
300-
.withState(State.FINISHED));
313+
uploadJobService.update(jobEntryWithTimestamp, ImmutableAsyncUploadJobEntry.copyOf(jobEntryWithTimestamp)
314+
.withFileId(fileEntry.getId())
315+
.withMtaId(descriptor.getId())
316+
.withFinishedAt(LocalDateTime.now())
317+
.withState(State.FINISHED));
301318
} catch (Exception e) {
302319
LOGGER.error(MessageFormat.format(Messages.ASYNC_UPLOAD_JOB_FAILED, jobEntry.getId(), e.getMessage()), e);
303-
uploadJobService.update(jobEntry, ImmutableAsyncUploadJobEntry.copyOf(jobEntry)
304-
.withError(e.getMessage())
305-
.withState(State.ERROR));
320+
uploadJobService.update(jobEntryWithTimestamp, ImmutableAsyncUploadJobEntry.copyOf(jobEntryWithTimestamp)
321+
.withError(e.getMessage())
322+
.withState(State.ERROR));
306323
}
307324
}
308325

@@ -325,11 +342,11 @@ private FileEntry doUploadFileFromUrl(String spaceGuid, String namespace, String
325342

326343
String fileName = extractFileName(fileUrl);
327344
FileUtils.validateFileHasExtension(fileName);
328-
counter.set(0); //reset counter on retry
345+
counter.set(0); // reset counter on retry
329346
// Normal stream returned from the http response always returns 0 when InputStream::available() is executed which seems to break
330347
// JClods library: https://issues.apache.org/jira/browse/JCLOUDS-1623
331348
try (CountingInputStream source = new CountingInputStream(response.body(), counter);
332-
BufferedInputStream bufferedContent = new BufferedInputStream(source, INPUT_STREAM_BUFFER_SIZE)) {
349+
BufferedInputStream bufferedContent = new BufferedInputStream(source, INPUT_STREAM_BUFFER_SIZE)) {
333350
LOGGER.debug(Messages.UPLOADING_MTAR_STREAM_FROM_REMOTE_ENDPOINT, response.uri());
334351
return fileService.addFile(spaceGuid, namespace, fileName, bufferedContent, fileSize);
335352
}
@@ -342,8 +359,8 @@ private HttpResponse<InputStream> callRemoteEndpointWithRetry(HttpClient client,
342359
var response = client.send(request, BodyHandlers.ofInputStream());
343360
if (response.statusCode() / 100 != 2) {
344361
String error = readErrorBodyFromResponse(response);
345-
throw new SLException(MessageFormat.format(Messages.ERROR_FROM_REMOTE_MTAR_ENDPOINT, request.uri(),
346-
response.statusCode(), error));
362+
throw new SLException(MessageFormat.format(Messages.ERROR_FROM_REMOTE_MTAR_ENDPOINT, request.uri(), response.statusCode(),
363+
error));
347364
}
348365
return response;
349366
});

multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.cloudfoundry.multiapps.controller.web.api.impl;
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.mockito.ArgumentMatchers.any;
45
import static org.mockito.Mockito.when;
56

67
import java.io.InputStream;
@@ -19,6 +20,8 @@
1920
import java.util.UUID;
2021
import java.util.concurrent.Executor;
2122

23+
import javax.persistence.NoResultException;
24+
2225
import org.apache.commons.lang3.RandomStringUtils;
2326
import org.cloudfoundry.multiapps.common.SLException;
2427
import org.cloudfoundry.multiapps.controller.api.model.AsyncUploadResult;
@@ -59,8 +62,6 @@
5962
import org.springframework.web.multipart.MultipartFile;
6063
import org.springframework.web.multipart.MultipartHttpServletRequest;
6164

62-
import javax.persistence.NoResultException;
63-
6465
class FilesApiServiceImplTest {
6566

6667
@Mock
@@ -125,8 +126,9 @@ public void initialize() throws Exception {
125126
Runnable r = invocationOnMock.getArgument(0);
126127
r.run();
127128
return null;
128-
}).when(asyncFileUploadExecutor)
129-
.execute(Mockito.any());
129+
})
130+
.when(asyncFileUploadExecutor)
131+
.execute(Mockito.any());
130132
}
131133

132134
@Test
@@ -176,8 +178,8 @@ void testUploadMtaFile() throws Exception {
176178
Mockito.verify(file)
177179
.getInputStream();
178180
Mockito.verify(fileService)
179-
.addFile(Mockito.eq(SPACE_GUID), Mockito.eq(NAMESPACE_GUID), Mockito.eq(fileName),
180-
Mockito.any(InputStream.class), Mockito.eq(fileSize));
181+
.addFile(Mockito.eq(SPACE_GUID), Mockito.eq(NAMESPACE_GUID), Mockito.eq(fileName), Mockito.any(InputStream.class),
182+
Mockito.eq(fileSize));
181183

182184
FileMetadata fileMetadata = response.getBody();
183185
assertMetadataMatches(fileEntry, fileMetadata);
@@ -206,14 +208,17 @@ void testUploadFileFromUrl() throws Exception {
206208
.thenReturn(jobEntry);
207209
Mockito.when(uploadJobService.createQuery())
208210
.thenReturn(query);
211+
Mockito.when(uploadJobService.update(any(), any()))
212+
.thenReturn(jobEntry);
209213

210-
Mockito.when(fileService.addFile(Mockito.eq(SPACE_GUID), Mockito.eq(NAMESPACE_GUID), Mockito.eq(fileName),
211-
Mockito.any(), Mockito.eq(20L)))
214+
Mockito.when(fileService.addFile(Mockito.eq(SPACE_GUID), Mockito.eq(NAMESPACE_GUID), Mockito.eq(fileName), Mockito.any(),
215+
Mockito.eq(20L)))
212216
.thenReturn(fileEntry);
213217
Mockito.when(fileService.getFile(Mockito.eq(SPACE_GUID), Mockito.eq(fileEntry.getId())))
214218
.thenReturn(fileEntry);
215219

216-
ResponseEntity<Void> startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE_GUID, ImmutableFileUrl.of(FILE_URL));
220+
ResponseEntity<Void> startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE_GUID,
221+
ImmutableFileUrl.of(FILE_URL));
217222

218223
assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED);
219224

@@ -226,8 +231,7 @@ void testUploadFileFromUrl() throws Exception {
226231
assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.CREATED);
227232

228233
Mockito.verify(fileService)
229-
.addFile(Mockito.eq(SPACE_GUID), Mockito.eq(NAMESPACE_GUID), Mockito.eq(fileName), Mockito.any(),
230-
Mockito.eq(20L));
234+
.addFile(Mockito.eq(SPACE_GUID), Mockito.eq(NAMESPACE_GUID), Mockito.eq(fileName), Mockito.any(), Mockito.eq(20L));
231235

232236
var responseBody = uploadJobResponse.getBody();
233237
var fileMetadata = responseBody.getFile();
@@ -247,7 +251,7 @@ void testUploadFileFromUrlWithInvalidJobId() {
247251
.singleResult();
248252

249253
Mockito.when(uploadJobService.createQuery())
250-
.thenReturn(query);
254+
.thenReturn(query);
251255

252256
ResponseEntity<AsyncUploadResult> response = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE_GUID, "invalid");
253257

@@ -273,8 +277,11 @@ void testFileUrlDoesntReturnContentLength() throws Exception {
273277
.thenReturn(jobEntry);
274278
Mockito.when(uploadJobService.createQuery())
275279
.thenReturn(query);
280+
Mockito.when(uploadJobService.update(any(), any()))
281+
.thenReturn(jobEntry);
276282

277-
ResponseEntity<Void> startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE_GUID, ImmutableFileUrl.of(FILE_URL));
283+
ResponseEntity<Void> startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE_GUID,
284+
ImmutableFileUrl.of(FILE_URL));
278285

279286
assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED);
280287

@@ -312,8 +319,11 @@ void testFileUrlReturnsContentLengthAboveMaxUploadSize() throws Exception {
312319
.thenReturn(jobEntry);
313320
Mockito.when(uploadJobService.createQuery())
314321
.thenReturn(query);
322+
Mockito.when(uploadJobService.update(any(), any()))
323+
.thenReturn(jobEntry);
315324

316-
ResponseEntity<Void> startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE_GUID, ImmutableFileUrl.of(FILE_URL));
325+
ResponseEntity<Void> startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE_GUID,
326+
ImmutableFileUrl.of(FILE_URL));
317327

318328
assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED);
319329

@@ -331,7 +341,7 @@ void testFileUrlReturnsContentLengthAboveMaxUploadSize() throws Exception {
331341
}
332342

333343
@ParameterizedTest
334-
@ValueSource(strings = {"https://host.domain/path/file?query=true", "http://host.domain/path/file.mtar?query=true"})
344+
@ValueSource(strings = { "https://host.domain/path/file?query=true", "http://host.domain/path/file.mtar?query=true" })
335345
void testUploadFileWithInvalidUrl(String url) throws Exception {
336346
AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF);
337347
HttpHeaders headers = HttpHeaders.of(Map.of("Content-Length", List.of("20")), (a, b) -> true);
@@ -349,11 +359,14 @@ void testUploadFileWithInvalidUrl(String url) throws Exception {
349359
.thenReturn(jobEntry);
350360
Mockito.when(uploadJobService.createQuery())
351361
.thenReturn(query);
362+
Mockito.when(uploadJobService.update(any(), any()))
363+
.thenReturn(jobEntry);
352364

353365
String invalidFileUrl = Base64.getUrlEncoder()
354366
.encodeToString(url.getBytes(StandardCharsets.UTF_8));
355367

356-
ResponseEntity<Void> startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE_GUID, ImmutableFileUrl.of(invalidFileUrl));
368+
ResponseEntity<Void> startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE_GUID,
369+
ImmutableFileUrl.of(invalidFileUrl));
357370

358371
assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED);
359372

@@ -400,6 +413,7 @@ private AsyncUploadJobEntry mockUploadJobEntry(String fileId, AsyncUploadJobEntr
400413
when(jobEntry.getStartedAt()).thenReturn(LocalDateTime.MIN);
401414
when(jobEntry.getFileId()).thenReturn(fileId);
402415
when(jobEntry.getState()).thenReturn(jobState);
416+
when(jobEntry.getUrl()).thenReturn("https://artifactory.sap/mta");
403417
if (jobState == AsyncUploadJobEntry.State.ERROR) {
404418
when(jobEntry.getError()).thenReturn(error);
405419
}

0 commit comments

Comments
 (0)