Skip to content

Commit

Permalink
[Kernel] Cleanup the FileNames.java utility methods
Browse files Browse the repository at this point in the history
Signed-off-by: Tai Le Manh <manhtai.lmt@gmail.com>
  • Loading branch information
tlm365 committed Aug 21, 2024
1 parent 01bf607 commit 69f9c90
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.delta.kernel.internal;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.fs.Path.getName;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
Expand Down Expand Up @@ -92,11 +91,11 @@ public static long getEarliestRecreatableCommit(Engine engine, Path logPath)
throws TableNotFoundException {
try (CloseableIterator<FileStatus> files =
listFrom(engine, logPath, 0)
.filter(
fs ->
FileNames.isCommitFile(getName(fs.getPath()))
|| FileNames.isCheckpointFile(getName(fs.getPath())))) {

.filter(fs -> {
final Path path = new Path(fs.getPath());
return FileNames.isCommitFile(path)
|| FileNames.isCheckpointFile(path);
})) {
if (!files.hasNext()) {
// listFrom already throws an error if the directory is truly empty, thus this must
// be because no files are checkpoint or delta files
Expand All @@ -114,8 +113,8 @@ public static long getEarliestRecreatableCommit(Engine engine, Path logPath)
// remember it and return it once we detect that we've seen a smaller or equal delta
// version.
while (files.hasNext()) {
String nextFilePath = files.next().getPath();
if (FileNames.isCommitFile(getName(nextFilePath))) {
final Path nextFilePath = new Path(files.next().getPath());
if (FileNames.isCommitFile(nextFilePath)) {
long version = FileNames.deltaVersion(nextFilePath);
if (version == 0L) {
return version;
Expand Down Expand Up @@ -200,7 +199,7 @@ private static List<Commit> getCommits(Engine engine, Path logPath, long start)
throws TableNotFoundException {
CloseableIterator<Commit> commits =
listFrom(engine, logPath, start)
.filter(fs -> FileNames.isCommitFile(getName(fs.getPath())))
.filter(fs -> FileNames.isCommitFile(new Path(fs.getPath())))
.map(fs -> new Commit(FileNames.deltaVersion(fs.getPath()), fs.getModificationTime()));
return monotonizeCommitTimestamps(commits);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public boolean usesSidecars() {

public final Optional<Path> filePath; // Guaranteed to be present for V2 checkpoints.

public CheckpointInstance(String path) {
public CheckpointInstance(Path path) {
Preconditions.checkArgument(
FileNames.isCheckpointFile(path), "not a valid checkpoint file name");

String[] pathParts = getPathName(path).split("\\.");
String[] pathParts = path.getName().split("\\.");

if (pathParts.length == 3 && pathParts[2].equals("parquet")) {
// Classic checkpoint 00000000000000000010.checkpoint.parquet
Expand All @@ -75,9 +75,9 @@ public CheckpointInstance(String path) {
this.version = Long.parseLong(pathParts[0]);
this.numParts = Optional.empty();
this.format = CheckpointFormat.V2;
this.filePath = Optional.of(new Path(path));
this.filePath = Optional.of(path);
} else {
throw new RuntimeException("Unrecognized checkpoint path format: " + getPathName(path));
throw new RuntimeException("Unrecognized checkpoint path format: " + path.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ public static Optional<CheckpointInstance> findLastCompleteCheckpointBefore(
List<CheckpointInstance> checkpoints = new ArrayList<>();
while (deltaLogFileIter.hasNext()) {
FileStatus fileStatus = deltaLogFileIter.next();
String fileName = new Path(fileStatus.getPath()).getName();
Path filePath = new Path(fileStatus.getPath());

long currentFileVersion;
if (FileNames.isCommitFile(fileName)) {
currentFileVersion = FileNames.deltaVersion(fileName);
} else if (FileNames.isCheckpointFile(fileName)) {
currentFileVersion = FileNames.checkpointVersion(fileName);
if (FileNames.isCommitFile(filePath)) {
currentFileVersion = FileNames.deltaVersion(filePath);
} else if (FileNames.isCheckpointFile(filePath)) {
currentFileVersion = FileNames.checkpointVersion(filePath);
} else {
// allow all other types of files.
currentFileVersion = currentVersion;
Expand All @@ -133,7 +133,7 @@ public static Optional<CheckpointInstance> findLastCompleteCheckpointBefore(
break;
}
if (validCheckpointFile(fileStatus)) {
checkpoints.add(new CheckpointInstance(fileStatus.getPath()));
checkpoints.add(new CheckpointInstance(filePath));
}
numberOfFilesSearched++;
}
Expand Down Expand Up @@ -162,7 +162,7 @@ public static Optional<CheckpointInstance> findLastCompleteCheckpointBefore(
}

private static boolean validCheckpointFile(FileStatus fileStatus) {
return FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName())
return FileNames.isCheckpointFile(new Path(fileStatus.getPath()))
&& fileStatus.getSize() > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,4 @@ public void validateObject() throws InvalidObjectException {
throw new InvalidObjectException("No URI in deserialized Path");
}
}

public static String getName(String pathString) {
return new Path(pathString).getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ public ColumnarBatch extractSidecarsFromBatch(

// Sidecars will exist in schema. Extract sidecar files, then remove sidecar files from
// batch output.
List<DeltaLogFile> outputFiles = new ArrayList<>();
int sidecarIndex = columnarBatch.getSchema().fieldNames().indexOf(LogReplay.SIDECAR_FIELD_NAME);
ColumnVector sidecarVector = columnarBatch.getColumnVector(sidecarIndex);
for (int i = 0; i < columnarBatch.getSize(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.SetTransaction;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
Expand Down Expand Up @@ -238,7 +239,7 @@ private List<FileStatus> getWinningCommitFiles(Engine engine) {
List<FileStatus> winningCommitFiles = new ArrayList<>();
while (files.hasNext()) {
FileStatus file = files.next();
if (FileNames.isCommitFile(file.getPath())) {
if (FileNames.isCommitFile(new Path(file.getPath()))) {
winningCommitFiles.add(file);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ public enum LogType {
}

public static DeltaLogFile forCommitOrCheckpoint(FileStatus file) {
String fileName = new Path(file.getPath()).getName();
LogType logType = null;
long version = -1;
if (FileNames.isCommitFile(fileName)) {
Path filePath = new Path(file.getPath());
LogType logType;
long version;
if (FileNames.isCommitFile(filePath)) {
logType = LogType.COMMIT;
version = FileNames.deltaVersion(fileName);
} else if (FileNames.isClassicCheckpointFile(fileName)) {
version = FileNames.deltaVersion(filePath);
} else if (FileNames.isClassicCheckpointFile(filePath)) {
logType = LogType.CHECKPOINT_CLASSIC;
version = FileNames.checkpointVersion(fileName);
} else if (FileNames.isMulitPartCheckpointFile(fileName)) {
version = FileNames.checkpointVersion(filePath);
} else if (FileNames.isMultiPartCheckpointFile(filePath)) {
logType = LogType.MULTIPART_CHECKPOINT;
version = FileNames.checkpointVersion(fileName);
} else if (FileNames.isV2CheckpointFile(fileName)) {
version = FileNames.checkpointVersion(filePath);
} else if (FileNames.isV2CheckpointFile(filePath)) {
logType = LogType.V2_CHECKPOINT_MANIFEST;
version = FileNames.checkpointVersion(fileName);
version = FileNames.checkpointVersion(filePath);
} else {
throw new IllegalArgumentException(
"File is not a commit or checkpoint file: " + file.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable;
import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore;
import static io.delta.kernel.internal.fs.Path.getName;
import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.lang.String.format;
Expand Down Expand Up @@ -221,7 +220,8 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio
Checkpointer checkpointer = new Checkpointer(logPath);
checkpointer.writeLastCheckpointFile(engine, checkpointMetaData);

logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version);
logger.info("{}: Last checkpoint metadata file is written for version: {}",
tablePath, version);

logger.info("{}: Finished checkpoint for version: {}", tablePath, version);
}
Expand Down Expand Up @@ -250,7 +250,8 @@ private CloseableIterator<FileStatus> listFrom(Engine engine, long startVersion)
throws IOException {
logger.debug("{}: startVersion: {}", tablePath, startVersion);
return wrapEngineExceptionThrowsIO(
() -> engine.getFileSystemClient().listFrom(FileNames.listingPrefix(logPath, startVersion)),
() -> engine.getFileSystemClient()
.listFrom(FileNames.listingPrefix(logPath, startVersion)),
"Listing from %s",
FileNames.listingPrefix(logPath, startVersion));
}
Expand All @@ -260,18 +261,19 @@ private CloseableIterator<FileStatus> listFrom(Engine engine, long startVersion)
* file (e.g., 000000000.json), or checkpoint file. (e.g.,
* 000000001.checkpoint.00001.00003.parquet)
*
* @param fileName Name of the file (not the full path)
* @param filePath Full path of the file
* @return Boolean Whether the file is delta log files
*/
private boolean isDeltaCommitOrCheckpointFile(String fileName) {
return FileNames.isCheckpointFile(fileName) || FileNames.isCommitFile(fileName);
private boolean isDeltaCommitOrCheckpointFile(Path filePath) {
return FileNames.isCheckpointFile(filePath) || FileNames.isCommitFile(filePath);
}

/**
* Returns an iterator containing a list of files found in the _delta_log directory starting with
* the startVersion. Returns None if no files are found or the directory is missing.
*/
private Optional<CloseableIterator<FileStatus>> listFromOrNone(Engine engine, long startVersion) {
private Optional<CloseableIterator<FileStatus>> listFromOrNone(
Engine engine, long startVersion) {
// LIST the directory, starting from the provided lower bound (treat missing dir as empty).
// NOTE: "empty/missing" is _NOT_ equivalent to "contains no useful commit files."
try {
Expand Down Expand Up @@ -325,10 +327,11 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
Optional<Long> versionToLoad,
Optional<TableCommitCoordinatorClientHandler> tableCommitHandlerOpt) {
versionToLoad.ifPresent(
v ->
checkArgument(
v >= startVersion,
format("versionToLoad=%s provided is less than startVersion=%s", v, startVersion)));
v -> checkArgument(
v >= startVersion,
format("versionToLoad=%s provided is less than startVersion=%s", v, startVersion)
)
);
logger.debug(
"startVersion: {}, versionToLoad: {}, coordinated commits enabled: {}",
startVersion,
Expand All @@ -350,16 +353,16 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(

while (fileStatusesIter.hasNext()) {
final FileStatus fileStatus = fileStatusesIter.next();
final String fileName = getName(fileStatus.getPath());
final Path filePath = new Path(fileStatus.getPath());

// Pick up all checkpoint and delta files
if (!isDeltaCommitOrCheckpointFile(fileName)) {
if (!isDeltaCommitOrCheckpointFile(filePath)) {
continue;
}

// Checkpoint files of 0 size are invalid but may be ignored silently when read,
// hence we drop them so that we never pick up such checkpoints.
if (FileNames.isCheckpointFile(fileName) && fileStatus.getSize() == 0) {
if (FileNames.isCheckpointFile(filePath) && fileStatus.getSize() == 0) {
continue;
}
// Take files until the version we want to load
Expand All @@ -385,10 +388,11 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
// files and so maxDeltaVersionSeen should be equal to fileVersion.
// But we are being defensive here and taking max of all the
// fileVersions seen.
if (FileNames.isCommitFile(fileName)) {
if (FileNames.isCommitFile(filePath)) {
maxDeltaVersionSeen.set(
Math.max(
maxDeltaVersionSeen.get(), FileNames.deltaVersion(fileStatus.getPath())));
maxDeltaVersionSeen.get(),
FileNames.deltaVersion(fileStatus.getPath())));
}
output.add(fileStatus);
}
Expand Down Expand Up @@ -551,8 +555,8 @@ private Optional<LogSegment> getLogSegmentFrom(
* ensure that the delta files are contiguous.
*
* @param startCheckpoint A potential start version to perform the listing of the DeltaLog,
* typically that of a known checkpoint. If this version's not provided, we will start listing
* from version 0.
* typically that of a known checkpoint. If this version's not provided, we will start
* listing from version 0.
* @param versionToLoad A specific version we try to load, but we may only load a version before
* this version if this version of commit is un-backfilled. Typically used with time travel
* and the Delta streaming source. If not provided, we will try to load the latest version of
Expand Down Expand Up @@ -662,7 +666,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
Tuple2<List<FileStatus>, List<FileStatus>> checkpointsAndDeltas =
ListUtils.partition(
newFiles,
fileStatus -> FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName()));
fileStatus -> FileNames.isCheckpointFile(new Path(fileStatus.getPath())));
final List<FileStatus> checkpoints = checkpointsAndDeltas._1;
final List<FileStatus> deltas = checkpointsAndDeltas._2;

Expand All @@ -682,7 +686,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(

final List<CheckpointInstance> checkpointFiles =
checkpoints.stream()
.map(f -> new CheckpointInstance(f.getPath()))
.map(f -> new CheckpointInstance(new Path(f.getPath())))
.collect(Collectors.toList());
logDebug(() -> format("checkpointFiles: %s", Arrays.toString(checkpointFiles.toArray())));

Expand Down Expand Up @@ -755,7 +759,8 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
.collect(Collectors.toCollection(LinkedList::new));

logDebug(
() -> format("deltaVersions: %s", Arrays.toString(deltaVersionsAfterCheckpoint.toArray())));
() ->
format("deltaVersions: %s", Arrays.toString(deltaVersionsAfterCheckpoint.toArray())));

final long newVersion =
deltaVersionsAfterCheckpoint.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,19 @@ public static long deltaVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
}

public static long deltaVersion(String path) {
final int slashIdx = path.lastIndexOf(Path.SEPARATOR);
final String name = path.substring(slashIdx + 1);
return Long.parseLong(name.split("\\.")[0]);
public static long deltaVersion(String pathString) {
final Path path = new Path(pathString);
return deltaVersion(path);
}

/** Returns the version for the given checkpoint path. */
public static long checkpointVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
}

public static long checkpointVersion(String path) {
final int slashIdx = path.lastIndexOf(Path.SEPARATOR);
final String name = path.substring(slashIdx + 1);
return Long.parseLong(name.split("\\.")[0]);
public static long checkpointVersion(String pathString) {
final Path path = new Path(pathString);
return checkpointVersion(path);
}

public static String sidecarFile(Path path, String sidecar) {
Expand Down Expand Up @@ -128,26 +126,25 @@ public static List<Path> checkpointFileWithParts(Path path, long version, int nu
return output;
}

public static boolean isCheckpointFile(String fileName) {
return CHECKPOINT_FILE_PATTERN.matcher(new Path(fileName).getName()).matches();
public static boolean isCheckpointFile(Path filePath) {
return CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches();
}

public static boolean isClassicCheckpointFile(String fileName) {
return CLASSIC_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
public static boolean isClassicCheckpointFile(Path filePath) {
return CLASSIC_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches();
}

public static boolean isMulitPartCheckpointFile(String fileName) {
return MULTI_PART_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
public static boolean isMultiPartCheckpointFile(Path filePath) {
return MULTI_PART_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches();
}

public static boolean isV2CheckpointFile(String fileName) {
return V2_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
public static boolean isV2CheckpointFile(Path filePath) {
return V2_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches();
}

public static boolean isCommitFile(String fileName) {
String filename = new Path(fileName).getName();
return DELTA_FILE_PATTERN.matcher(filename).matches()
|| UUID_DELTA_FILE_REGEX.matcher(filename).matches();
public static boolean isCommitFile(Path filePath) {
return DELTA_FILE_PATTERN.matcher(filePath.getName()).matches()
|| UUID_DELTA_FILE_REGEX.matcher(filePath.getName()).matches();
}

/**
Expand All @@ -157,9 +154,9 @@ public static boolean isCommitFile(String fileName) {
* upgrade.
*/
public static long getFileVersion(Path path) {
if (isCheckpointFile(path.getName())) {
if (isCheckpointFile(path)) {
return checkpointVersion(path);
} else if (isCommitFile(path.getName())) {
} else if (isCommitFile(path)) {
return deltaVersion(path);
// } else if (isChecksumFile(path)) {
// checksumVersion(path);
Expand Down
Loading

0 comments on commit 69f9c90

Please sign in to comment.