@@ -131,11 +131,17 @@ public Path compress(String sourcePath, String destinationPath, String outputFil
131
131
shouldLogProgress , logProgressPercentInterval , verbosity , excludeFiles , outputFile ).run ();
132
132
}
133
133
} else {
134
+ long [] fileNumIntervals = TarLz4Util .getFileCountIntervalsFromSize (Path .of (sourcePath ), numThreads );
135
+
136
+ // We may in actuality use less than numThreads if the way files are split can cover all files early,
137
+ // or we have less files than numThreads.
138
+ int actualNumThreads = (int ) fileNumIntervals [fileNumIntervals .length - 2 ];
139
+
134
140
// Reuse futures array
135
- var futures = new Future [numThreads ];
141
+ var futures = new Future [actualNumThreads ];
136
142
137
143
// Archive + Compression tasks
138
- submitArchiveTasks (sourcePath , destinationPath , fileCount , futures );
144
+ submitArchiveTasks (sourcePath , destinationPath , fileCount , fileNumIntervals , actualNumThreads , futures );
139
145
140
146
// At this point, we have all our .tmp files which are standalone .tar.lz4 compressed archives for each slice
141
147
// The .tmp files can't be opened themselves however, as they are a sliced part of the final output file.
@@ -149,7 +155,7 @@ public Path compress(String sourcePath, String destinationPath, String outputFil
149
155
// This is made possible with the AsynchronousFileChannel API, which allows for writing bytes directly into a file
150
156
// at some specified offset position.
151
157
152
- mergeTmpArchives (destinationPath , futures );
158
+ mergeTmpArchives (destinationPath , actualNumThreads , futures );
153
159
}
154
160
155
161
log .debug ("Finished compressing {} files from source={} to destination={}" , fileCount , sourcePath , destinationPath );
@@ -160,11 +166,10 @@ public Path compress(String sourcePath, String destinationPath, String outputFil
160
166
}
161
167
}
162
168
163
- private void submitArchiveTasks (String sourcePath , String destinationPath , long fileCount , Future <?>[] futures )
169
+ private void submitArchiveTasks (String sourcePath , String destinationPath , long fileCount , long [] fileNumIntervals , int numThreads , Future <?>[] futures )
164
170
throws IOException , ExecutionException , InterruptedException {
165
171
// Get the file number intervals
166
172
// TODO: Make it configurable to use file count vs this
167
- long [] fileNumIntervals = TarLz4Util .getFileCountIntervalsFromSize (Path .of (sourcePath ), numThreads );
168
173
long totalBytes = fileNumIntervals [fileNumIntervals .length - 1 ];
169
174
170
175
// In the multithreaded use case, we'll spin up `numThreads` threads, each writing to its own temporary file
@@ -211,10 +216,8 @@ private void submitArchiveTasks(String sourcePath, String destinationPath, long
211
216
currPercent = 0 ;
212
217
isDone = true ;
213
218
for (int i = 0 ; i < numThreads ; i ++) {
214
- if (futures [i ] != null && tasks [i ] != null ) {
215
- currPercent += tasks [i ].getBytesProcessed ();
216
- isDone &= futures [i ].isDone ();
217
- }
219
+ currPercent += tasks [i ].getBytesProcessed ();
220
+ isDone &= futures [i ].isDone ();
218
221
}
219
222
220
223
currPercent = currPercent * 100 / totalBytes ;
@@ -228,56 +231,44 @@ private void submitArchiveTasks(String sourcePath, String destinationPath, long
228
231
229
232
// Wait for all futures to finish
230
233
for (int i = 0 ; i < numThreads ; i ++) {
231
- if (futures [i ] != null ) {
232
- futures [i ].get ();
233
- tasks [i ].fos .close (); // Clean up and close the .tmp file OutputStreams
234
- }
234
+ futures [i ].get ();
235
+ tasks [i ].fos .close (); // Clean up and close the .tmp file OutputStreams
235
236
}
236
237
success = true ;
237
238
} finally {
238
239
// Safety check
239
240
if (!success ) {
240
241
// Wait for all futures to finish
241
242
for (int i = 0 ; i < numThreads ; i ++) {
242
- if (futures [i ] != null ) {
243
- futures [i ].get ();
244
- tasks [i ].fos .close (); // Clean up and close the .tmp file OutputStreams
245
- }
243
+ futures [i ].get ();
244
+ tasks [i ].fos .close (); // Clean up and close the .tmp file OutputStreams
246
245
}
247
246
}
248
247
}
249
248
250
249
log .debug ("Finished compressing archive task for source={}, destination={}" , sourcePath , destinationPath );
251
250
}
252
251
253
- private void mergeTmpArchives (String destinationPath , Future <?>[] futures ) throws IOException , ExecutionException , InterruptedException {
252
+ private void mergeTmpArchives (String destinationPath , int numThreads , Future <?>[] futures ) throws IOException , ExecutionException , InterruptedException {
254
253
// Pre-check which indices of futures are nonEmpty
255
- int n = 0 ;
256
- for (int i = 0 ; i < futures .length ; i ++) {
257
- if (futures [i ] == null ) {
258
- break ;
259
- }
260
- n ++;
261
- }
262
-
263
- FileInputStream [] tmpFiles = new FileInputStream [n ];
264
- FileChannel [] tmpChannels = new FileChannel [n ];
265
- long [] fileChannelOffsets = new long [n ];
254
+ FileInputStream [] tmpFiles = new FileInputStream [numThreads ];
255
+ FileChannel [] tmpChannels = new FileChannel [numThreads ];
256
+ long [] fileChannelOffsets = new long [numThreads ];
266
257
267
258
// This grabs a FileChannel to read for each .tmp file, and also calculates what all the fileChannel position offsets
268
259
// we should use for each Thread, based on the size in bytes of each .tmp file
269
- for (int i = 0 ; i < n ; i ++) {
260
+ for (int i = 0 ; i < numThreads ; i ++) {
270
261
tmpFiles [i ] = new FileInputStream (destinationPath + "_" + i + TMP_SUFFIX );
271
262
tmpChannels [i ] = tmpFiles [i ].getChannel ();
272
- if (i < n - 1 ) {
263
+ if (i < numThreads - 1 ) {
273
264
fileChannelOffsets [i + 1 ] = fileChannelOffsets [i ] + tmpChannels [i ].size ();
274
265
}
275
266
}
276
267
277
268
// Create an AsynchronousFileChannel for the final output `.tar.lz4` file
278
269
// This channel is has the capability to WRITE to the file, or CREATE it if it doesn't yet exist
279
270
AsynchronousFileChannel destChannel = AsynchronousFileChannel .open (Path .of (destinationPath ), WRITE , CREATE );
280
- for (int i = 0 ; i < n ; i ++) {
271
+ for (int i = 0 ; i < numThreads ; i ++) {
281
272
int finalI = i ;
282
273
// Let's again spin up a thread for each .tmp file to write to its slice, or region in the final output file
283
274
futures [i ] = executorService .submit (() -> {
@@ -316,7 +307,7 @@ private void mergeTmpArchives(String destinationPath, Future<?>[] futures) throw
316
307
}
317
308
318
309
// Wait for all futures to finish
319
- for (int i = 0 ; i < n ; i ++) {
310
+ for (int i = 0 ; i < numThreads ; i ++) {
320
311
futures [i ].get ();
321
312
}
322
313
0 commit comments