Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin): Delete the compressed file after extraction #629

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public LocalFSDirectoryListing() {
this(Collections.emptyList());
}


/**
* Creates a new {@link LocalFSDirectoryListing} instance.
*
Expand Down Expand Up @@ -86,8 +85,7 @@ private Collection<FileObjectMeta> toSourceObjects(final Collection<File> allFil
} catch (ConnectFilePulseException e) {
LOG.warn(
"Failed to read metadata. Object file is ignored: {}",
e.getMessage()
);
e.getMessage());
return Optional.<LocalFileObjectMeta>empty();
}
})
Expand All @@ -105,22 +103,54 @@ public void setFilter(final FileListFilter filter) {

private List<File> listEligibleFiles(final Path input) {
final List<File> listingLocalFiles = new LinkedList<>();
if (!Files.isReadable(input)) {
LOG.warn("Cannot get directory listing for '{}'. Input path is not readable.", input);
return listingLocalFiles;
}

if (!Files.isDirectory(input)) {
LOG.warn("Cannot get directory listing for '{}'. Input path is not a directory.", input);
if (!isPathReadable(input)) {
return listingLocalFiles;
}

if (isHidden(input)) {
if (!isPathDirectory(input) || isHidden(input)) {
return listingLocalFiles;
}

final List<Path> decompressedDirs = new LinkedList<>();
final List<Path> directories = new LinkedList<>();
processFiles(input, listingLocalFiles, directories, decompressedDirs);

if (config.isRecursiveScanEnable() && !directories.isEmpty()) {
listingLocalFiles.addAll(scanRecursiveDirectories(directories, decompressedDirs));
}
return listingLocalFiles;
}

private boolean isPathReadable(Path path) {
if (!Files.isReadable(path)) {
LOG.warn("Cannot get directory listing for '{}'. Input path is not readable.", path);
return false;
}
return true;
}

private boolean isPathDirectory(Path path) {
if (!Files.isDirectory(path)) {
LOG.warn("Cannot get directory listing for '{}'. Input path is not a directory.", path);
return false;
}
return true;
}

private boolean isHidden(final Path input) {
try {
return Files.isHidden(input);
} catch (IOException e) {
LOG.warn(
"Error while checking if input file is hidden '{}': {}",
input,
e.getLocalizedMessage());
return false;
}
}

private void processFiles(Path input, List<File> listingLocalFiles, List<Path> directories,
List<Path> decompressedDirs) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(input)) {
for (Path path : stream) {
if (Files.isDirectory(path)) {
Expand All @@ -139,8 +169,14 @@ private List<File> listEligibleFiles(final Path input) {
final Path decompressed = codec.decompress(file).toPath();
listingLocalFiles.addAll(listEligibleFiles(decompressed));
decompressedDirs.add(decompressed);
} catch (IOException e) {
LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e);
LOG.debug("Compressed file extracted successfully : {}", path);
handleFileDeletion(file, path);
} catch (IOException | SecurityException e) {
if (e instanceof IOException) {
LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e);
} else if (e instanceof SecurityException) {
LOG.warn("Error while deleting input file '{}'. Skip and continue.", path, e);
}
}
} else {
// If no codec was found for the input file,
Expand All @@ -152,37 +188,29 @@ private List<File> listEligibleFiles(final Path input) {
}
}
} catch (IOException e) {
LOG.error(
"Error while getting directory listing for {}: {}",
input,
e.getLocalizedMessage()
);
LOG.error("Error while getting directory listing for {}: {}", input, e.getLocalizedMessage());
throw new ConnectException(e);
}

if (config.isRecursiveScanEnable() && !directories.isEmpty()) {
listingLocalFiles.addAll(directories.stream()
.filter(f -> !decompressedDirs.contains(f))
.flatMap(f -> listEligibleFiles(f).stream())
.collect(Collectors.toList())
);
}
return listingLocalFiles;
}

private boolean isHidden(final Path input) {
try {
return Files.isHidden(input);
} catch (IOException e) {
LOG.warn(
"Error while checking if input file is hidden '{}': {}",
input,
e.getLocalizedMessage()
);
return false;
private void handleFileDeletion(File file, Path path) {
if (config.isDeleteCompressFileEnable() && file.exists()) {
if (file.delete()) {
LOG.debug("Compressed file deleted successfully : {}", path);
} else {
LOG.warn("Error while deleting input file '{}'. Skip and continue.", path);
}
}
}

private List<File> scanRecursiveDirectories(List<Path> directories, List<Path> decompressedDirs) {
return directories.stream()
.filter(f -> !decompressedDirs.contains(f))
.flatMap(f -> listEligibleFiles(f).stream())
.collect(Collectors.toList());
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,37 @@

public class LocalFSDirectoryListingConfig extends AbstractConfig {


public static final String FS_LISTING_DIRECTORY_PATH = "fs.listing.directory.path";
public static final String FS_LISTING_DIRECTORY_DOC = "The input directory to scan";

public static final String FS_RECURSIVE_SCAN_ENABLE_CONFIG = "fs.listing.recursive.enabled";
private static final String FS_RECURSIVE_SCAN_ENABLE_DOC = "Boolean indicating whether local directory " +
"should be recursively scanned (default true).";
public static final String FS_RECURSIVE_SCAN_ENABLE_CONFIG = "fs.listing.recursive.enabled";
private static final String FS_RECURSIVE_SCAN_ENABLE_DOC = "Boolean indicating whether local directory " +
"should be recursively scanned (default true).";

public static final String FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG = "fs.delete.compress.files.enabled";
private static final String FS_DELETE_COMPRESS_FILES_ENABLE_DOC = "Flag indicating whether compressed file " +
"should be deleted after extraction (default false)";

public static ConfigDef getConf() {
return new ConfigDef()
.define(
FS_LISTING_DIRECTORY_PATH,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
FS_LISTING_DIRECTORY_DOC
)

.define(
FS_RECURSIVE_SCAN_ENABLE_CONFIG,
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.MEDIUM,
FS_RECURSIVE_SCAN_ENABLE_DOC
);
.define(
FS_LISTING_DIRECTORY_PATH,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
FS_LISTING_DIRECTORY_DOC)

.define(
FS_RECURSIVE_SCAN_ENABLE_CONFIG,
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.MEDIUM,
FS_RECURSIVE_SCAN_ENABLE_DOC)
.define(
FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.MEDIUM,
FS_DELETE_COMPRESS_FILES_ENABLE_DOC);
}

/**
Expand All @@ -53,4 +60,8 @@ public boolean isRecursiveScanEnable() {
public String listingDirectoryPath() {
return this.getString(FS_LISTING_DIRECTORY_PATH);
}

public boolean isDeleteCompressFileEnable() {
return getBoolean(FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class LocalFSDirectoryListingTest {

private static final String DEFAULT_ENTRY_FILE_NAME = "file-entry-0.txt";
private static final String DEFAULT_ARCHIVE_NAME = "archive";
private static final String DEFAULT_ARCHIVE_NAME = "archive";
private static final String TEST_SCAN_DIRECTORY = "test-scan";

@Rule
Expand All @@ -54,47 +54,79 @@ public void shouldExtractXZGipCompressedFilesPathWhileScanningGivenRecursiveScan
zos.closeEntry();
}

scanner.configure(new HashMap<>() {{
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false);
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
}});
scanner.configure(new HashMap<>() {
{
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false);
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
}
});

final Collection<FileObjectMeta> scanned = scanner.listObjects();
Assert.assertEquals(1, scanned.size());
String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(), DEFAULT_ARCHIVE_NAME, DEFAULT_ENTRY_FILE_NAME));
String expected = String.join(File.separator,
Arrays.asList(inputDirectory.getCanonicalPath(), DEFAULT_ARCHIVE_NAME, DEFAULT_ENTRY_FILE_NAME));
Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next()));
}

@Test
public void shouldExtractGzipCompressedFiles() throws IOException {
public void shouldExtractGzipCompressedFilesAndKeepGzipFileAfterExtraction() throws IOException {
File archiveFile = new File(inputDirectory, DEFAULT_ARCHIVE_NAME + ".gz");

try (GZIPOutputStream os = new GZIPOutputStream(new FileOutputStream(archiveFile))) {
byte[] data = "dummy".getBytes();
os.write(data, 0, data.length);
}

scanner.configure(new HashMap<>() {{
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
}});
scanner.configure(new HashMap<>() {
{
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
}
});

final Collection<FileObjectMeta> scanned = scanner.listObjects();
Assert.assertEquals(1, scanned.size());
String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(),
DEFAULT_ARCHIVE_NAME, DEFAULT_ARCHIVE_NAME));
Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next()));
Assert.assertTrue(archiveFile.exists());
}


@Test
public void shouldExtractGzipCompressedFilesAndDeleteGzipFileAfterExtraction() throws IOException {
File archiveFile = new File(inputDirectory, DEFAULT_ARCHIVE_NAME + ".gz");

try (GZIPOutputStream os = new GZIPOutputStream(new FileOutputStream(archiveFile))) {
byte[] data = "dummy".getBytes();
os.write(data, 0, data.length);
}

scanner.configure(new HashMap<>() {
{
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
put(LocalFSDirectoryListingConfig.FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG, true);
}
});

final Collection<FileObjectMeta> scanned = scanner.listObjects();
Assert.assertEquals(1, scanned.size());
String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(),
DEFAULT_ARCHIVE_NAME, DEFAULT_ARCHIVE_NAME));
Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next()));
Assert.assertTrue(!archiveFile.exists());
}

@Test
public void shouldListFilesGivenRecursiveScanEnable() throws IOException {
folder.newFolder(TEST_SCAN_DIRECTORY , "sub-directory");
folder.newFolder(TEST_SCAN_DIRECTORY, "sub-directory");
final File file1 = folder.newFile(TEST_SCAN_DIRECTORY + "/test-file1.txt");
final File file2 = folder.newFile(TEST_SCAN_DIRECTORY + "/sub-directory/test-file2.txt");

scanner.configure(new HashMap<String, Object>(){{
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, true);
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
}});
scanner.configure(new HashMap<String, Object>() {
{
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, true);
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
}
});

final Collection<String> scanned = scanner
.listObjects()
Expand All @@ -117,14 +149,16 @@ private String getCanonicalPath(final FileObjectMeta s) {

@Test
public void shouldListFilesGivenRecursiveScanDisable() throws IOException {
folder.newFolder(TEST_SCAN_DIRECTORY , "sub-directory");
folder.newFolder(TEST_SCAN_DIRECTORY, "sub-directory");
final File file1 = folder.newFile(TEST_SCAN_DIRECTORY + "/test-file1.txt");
folder.newFile(TEST_SCAN_DIRECTORY + "/sub-directory/test-file2.txt"); // will not be scanned

scanner.configure(new HashMap<String, Object>(){{
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false);
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
}});
scanner.configure(new HashMap<String, Object>() {
{
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false);
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
}
});

final Collection<String> scanned = scanner
.listObjects()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,28 @@
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Assert;
import org.junit.Test;

public class DefaultOffsetPolicyTest {

private String getValidPath() {
String validPath = SystemUtils.OS_NAME;
if (validPath.contains("Windows")) {
return "C:\\tmp\\path";
} else {
return "/tmp/path";
}
}

private static final GenericFileObjectMeta metadata = new GenericFileObjectMeta(
URI.create("file:///tmp/path/test"),
"test",
0L,
123L,
new FileObjectMeta.ContentDigest("789", "dummy"),
Collections.singletonMap(LocalFileObjectMeta.SYSTEM_FILE_INODE_META_KEY, "456L")
);
Collections.singletonMap(LocalFileObjectMeta.SYSTEM_FILE_INODE_META_KEY, "456L"));

@Test(expected = IllegalArgumentException.class)
public void should_throw_illegal_argument_given_empty_strategy() {
Expand All @@ -46,7 +55,7 @@ public void should_throw_npe_given_unknown_strategy() {
public void should_get_offset_based_on_path() {
Map<String, Object> result = new DefaultSourceOffsetPolicy("PATH").toPartitionMap(metadata);
Assert.assertEquals(1, result.size());
Assert.assertEquals("/tmp/path", result.get("path"));
Assert.assertEquals(getValidPath(), result.get("path"));
}

@Test
Expand Down Expand Up @@ -76,7 +85,7 @@ public void should_get_offset_based_on_name() {
public void should_get_composed_offset_based_on_path_and_hash() {
Map<String, Object> result = new DefaultSourceOffsetPolicy("PATH+HASH").toPartitionMap(metadata);
Assert.assertEquals(2, result.size());
Assert.assertEquals("/tmp/path", result.get("path"));
Assert.assertEquals(getValidPath(), result.get("path"));
Assert.assertEquals("789", result.get("hash"));
}

Expand Down
Loading
Loading