Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Cleanup the FileNames.java utility methods #3588

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,6 @@ lazy val commonSettings = Seq(
unidocSourceFilePatterns := Nil,
)

// enforce java code style
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a particular reason why this needs to be moved?

def javafmtCheckSettings() = Seq(
(Compile / compile) := ((Compile / compile) dependsOn (Compile / javafmtCheckAll)).value
)

/**
* Note: we cannot access sparkVersion.value here, since that can only be used within a task or
* setting macro.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.delta.kernel.internal;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.fs.Path.getName;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
Expand Down Expand Up @@ -93,10 +92,10 @@ public static long getEarliestRecreatableCommit(Engine engine, Path logPath)
try (CloseableIterator<FileStatus> files =
listFrom(engine, logPath, 0)
.filter(
fs ->
FileNames.isCommitFile(getName(fs.getPath()))
|| FileNames.isCheckpointFile(getName(fs.getPath())))) {

fs -> {
final Path path = new Path(fs.getPath());
return FileNames.isCommitFile(path) || FileNames.isCheckpointFile(path);
})) {
if (!files.hasNext()) {
// listFrom already throws an error if the directory is truly empty, thus this must
// be because no files are checkpoint or delta files
Expand All @@ -114,8 +113,8 @@ public static long getEarliestRecreatableCommit(Engine engine, Path logPath)
// remember it and return it once we detect that we've seen a smaller or equal delta
// version.
while (files.hasNext()) {
String nextFilePath = files.next().getPath();
if (FileNames.isCommitFile(getName(nextFilePath))) {
final Path nextFilePath = new Path(files.next().getPath());
if (FileNames.isCommitFile(nextFilePath)) {
long version = FileNames.deltaVersion(nextFilePath);
if (version == 0L) {
return version;
Expand Down Expand Up @@ -200,7 +199,7 @@ private static List<Commit> getCommits(Engine engine, Path logPath, long start)
throws TableNotFoundException {
CloseableIterator<Commit> commits =
listFrom(engine, logPath, start)
.filter(fs -> FileNames.isCommitFile(getName(fs.getPath())))
.filter(fs -> FileNames.isCommitFile(new Path(fs.getPath())))
.map(fs -> new Commit(FileNames.deltaVersion(fs.getPath()), fs.getModificationTime()));
return monotonizeCommitTimestamps(commits);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public boolean usesSidecars() {

public final Optional<Path> filePath; // Guaranteed to be present for V2 checkpoints.

public CheckpointInstance(String path) {
public CheckpointInstance(Path path) {
Preconditions.checkArgument(
FileNames.isCheckpointFile(path), "not a valid checkpoint file name");

String[] pathParts = getPathName(path).split("\\.");
String[] pathParts = path.getName().split("\\.");

if (pathParts.length == 3 && pathParts[2].equals("parquet")) {
// Classic checkpoint 00000000000000000010.checkpoint.parquet
Expand All @@ -75,9 +75,9 @@ public CheckpointInstance(String path) {
this.version = Long.parseLong(pathParts[0]);
this.numParts = Optional.empty();
this.format = CheckpointFormat.V2;
this.filePath = Optional.of(new Path(path));
this.filePath = Optional.of(path);
} else {
throw new RuntimeException("Unrecognized checkpoint path format: " + getPathName(path));
throw new RuntimeException("Unrecognized checkpoint path format: " + path.getName());
}
}

Expand Down Expand Up @@ -193,9 +193,4 @@ public int hashCode() {
// the filepath (which is empty) when hashing.
return Objects.hash(version, numParts, format, filePath);
}

private String getPathName(String path) {
int slash = path.lastIndexOf("/");
return path.substring(slash + 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ public static Optional<CheckpointInstance> findLastCompleteCheckpointBefore(
List<CheckpointInstance> checkpoints = new ArrayList<>();
while (deltaLogFileIter.hasNext()) {
FileStatus fileStatus = deltaLogFileIter.next();
String fileName = new Path(fileStatus.getPath()).getName();
Path filePath = new Path(fileStatus.getPath());

long currentFileVersion;
if (FileNames.isCommitFile(fileName)) {
currentFileVersion = FileNames.deltaVersion(fileName);
} else if (FileNames.isCheckpointFile(fileName)) {
currentFileVersion = FileNames.checkpointVersion(fileName);
if (FileNames.isCommitFile(filePath)) {
currentFileVersion = FileNames.deltaVersion(filePath);
} else if (FileNames.isCheckpointFile(filePath)) {
currentFileVersion = FileNames.checkpointVersion(filePath);
} else {
// allow all other types of files.
currentFileVersion = currentVersion;
Expand All @@ -133,7 +133,7 @@ public static Optional<CheckpointInstance> findLastCompleteCheckpointBefore(
break;
}
if (validCheckpointFile(fileStatus)) {
checkpoints.add(new CheckpointInstance(fileStatus.getPath()));
checkpoints.add(new CheckpointInstance(filePath));
}
numberOfFilesSearched++;
}
Expand Down Expand Up @@ -162,8 +162,7 @@ public static Optional<CheckpointInstance> findLastCompleteCheckpointBefore(
}

private static boolean validCheckpointFile(FileStatus fileStatus) {
return FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName())
&& fileStatus.getSize() > 0;
return FileNames.isCheckpointFile(new Path(fileStatus.getPath())) && fileStatus.getSize() > 0;
}

/** The path to the file that holds metadata about the most recent checkpoint. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,4 @@ public void validateObject() throws InvalidObjectException {
throw new InvalidObjectException("No URI in deserialized Path");
}
}

public static String getName(String pathString) {
return new Path(pathString).getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ public ColumnarBatch extractSidecarsFromBatch(

// Sidecars will exist in schema. Extract sidecar files, then remove sidecar files from
// batch output.
List<DeltaLogFile> outputFiles = new ArrayList<>();
int sidecarIndex = columnarBatch.getSchema().fieldNames().indexOf(LogReplay.SIDECAR_FIELD_NAME);
ColumnVector sidecarVector = columnarBatch.getColumnVector(sidecarIndex);
for (int i = 0; i < columnarBatch.getSize(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.SetTransaction;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
Expand Down Expand Up @@ -238,7 +239,7 @@ private List<FileStatus> getWinningCommitFiles(Engine engine) {
List<FileStatus> winningCommitFiles = new ArrayList<>();
while (files.hasNext()) {
FileStatus file = files.next();
if (FileNames.isCommitFile(file.getPath())) {
if (FileNames.isCommitFile(new Path(file.getPath()))) {
winningCommitFiles.add(file);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ public enum LogType {
}

public static DeltaLogFile forCommitOrCheckpoint(FileStatus file) {
String fileName = new Path(file.getPath()).getName();
LogType logType = null;
long version = -1;
if (FileNames.isCommitFile(fileName)) {
Path filePath = new Path(file.getPath());
LogType logType;
long version;
if (FileNames.isCommitFile(filePath)) {
logType = LogType.COMMIT;
version = FileNames.deltaVersion(fileName);
} else if (FileNames.isClassicCheckpointFile(fileName)) {
version = FileNames.deltaVersion(filePath);
} else if (FileNames.isClassicCheckpointFile(filePath)) {
logType = LogType.CHECKPOINT_CLASSIC;
version = FileNames.checkpointVersion(fileName);
} else if (FileNames.isMulitPartCheckpointFile(fileName)) {
version = FileNames.checkpointVersion(filePath);
} else if (FileNames.isMultiPartCheckpointFile(filePath)) {
logType = LogType.MULTIPART_CHECKPOINT;
version = FileNames.checkpointVersion(fileName);
} else if (FileNames.isV2CheckpointFile(fileName)) {
version = FileNames.checkpointVersion(filePath);
} else if (FileNames.isV2CheckpointFile(filePath)) {
logType = LogType.V2_CHECKPOINT_MANIFEST;
version = FileNames.checkpointVersion(fileName);
version = FileNames.checkpointVersion(filePath);
} else {
throw new IllegalArgumentException(
"File is not a commit or checkpoint file: " + file.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable;
import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore;
import static io.delta.kernel.internal.fs.Path.getName;
import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.lang.String.format;
Expand Down Expand Up @@ -260,11 +259,11 @@ private CloseableIterator<FileStatus> listFrom(Engine engine, long startVersion)
* file (e.g., 000000000.json), or checkpoint file. (e.g.,
* 000000001.checkpoint.00001.00003.parquet)
*
* @param fileName Name of the file (not the full path)
* @param filePath Full path of the file
* @return Boolean Whether the file is delta log files
*/
private boolean isDeltaCommitOrCheckpointFile(String fileName) {
return FileNames.isCheckpointFile(fileName) || FileNames.isCommitFile(fileName);
private boolean isDeltaCommitOrCheckpointFile(Path filePath) {
return FileNames.isCheckpointFile(filePath) || FileNames.isCommitFile(filePath);
}

/**
Expand Down Expand Up @@ -350,16 +349,16 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(

while (fileStatusesIter.hasNext()) {
final FileStatus fileStatus = fileStatusesIter.next();
final String fileName = getName(fileStatus.getPath());
final Path filePath = new Path(fileStatus.getPath());

// Pick up all checkpoint and delta files
if (!isDeltaCommitOrCheckpointFile(fileName)) {
if (!isDeltaCommitOrCheckpointFile(filePath)) {
continue;
}

// Checkpoint files of 0 size are invalid but may be ignored silently when read,
// hence we drop them so that we never pick up such checkpoints.
if (FileNames.isCheckpointFile(fileName) && fileStatus.getSize() == 0) {
if (FileNames.isCheckpointFile(filePath) && fileStatus.getSize() == 0) {
continue;
}
// Take files until the version we want to load
Expand All @@ -385,7 +384,7 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
// files and so maxDeltaVersionSeen should be equal to fileVersion.
// But we are being defensive here and taking max of all the
// fileVersions seen.
if (FileNames.isCommitFile(fileName)) {
if (FileNames.isCommitFile(filePath)) {
maxDeltaVersionSeen.set(
Math.max(
maxDeltaVersionSeen.get(), FileNames.deltaVersion(fileStatus.getPath())));
Expand Down Expand Up @@ -424,7 +423,7 @@ private List<Commit> getUnbackfilledCommits(
.map(
commitCoordinatorClientHandler -> {
logger.info(
"Getting un-backfilled commits from commit coordinator for " + "table: {}",
"Getting un-backfilled commits from commit coordinator for table: {}",
tablePath);
return commitCoordinatorClientHandler
.getCommits(startVersion, versionToLoad.orElse(null))
Expand Down Expand Up @@ -661,8 +660,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(

Tuple2<List<FileStatus>, List<FileStatus>> checkpointsAndDeltas =
ListUtils.partition(
newFiles,
fileStatus -> FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName()));
newFiles, fileStatus -> FileNames.isCheckpointFile(new Path(fileStatus.getPath())));
final List<FileStatus> checkpoints = checkpointsAndDeltas._1;
final List<FileStatus> deltas = checkpointsAndDeltas._2;

Expand All @@ -682,7 +680,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(

final List<CheckpointInstance> checkpointFiles =
checkpoints.stream()
.map(f -> new CheckpointInstance(f.getPath()))
.map(f -> new CheckpointInstance(new Path(f.getPath())))
.collect(Collectors.toList());
logDebug(() -> format("checkpointFiles: %s", Arrays.toString(checkpointFiles.toArray())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,19 @@ public static long deltaVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
}

public static long deltaVersion(String path) {
final int slashIdx = path.lastIndexOf(Path.SEPARATOR);
final String name = path.substring(slashIdx + 1);
return Long.parseLong(name.split("\\.")[0]);
public static long deltaVersion(String pathString) {
final Path path = new Path(pathString);
return deltaVersion(path);
}

/** Returns the version for the given checkpoint path. */
public static long checkpointVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
}

public static long checkpointVersion(String path) {
final int slashIdx = path.lastIndexOf(Path.SEPARATOR);
final String name = path.substring(slashIdx + 1);
return Long.parseLong(name.split("\\.")[0]);
public static long checkpointVersion(String pathString) {
final Path path = new Path(pathString);
return checkpointVersion(path);
}

public static String sidecarFile(Path path, String sidecar) {
Expand Down Expand Up @@ -128,26 +126,25 @@ public static List<Path> checkpointFileWithParts(Path path, long version, int nu
return output;
}

public static boolean isCheckpointFile(String fileName) {
return CHECKPOINT_FILE_PATTERN.matcher(new Path(fileName).getName()).matches();
public static boolean isCheckpointFile(Path filePath) {
return CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches();
}

public static boolean isClassicCheckpointFile(String fileName) {
return CLASSIC_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
public static boolean isClassicCheckpointFile(Path filePath) {
return CLASSIC_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches();
}

public static boolean isMulitPartCheckpointFile(String fileName) {
return MULTI_PART_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
public static boolean isMultiPartCheckpointFile(Path filePath) {
return MULTI_PART_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches();
}

public static boolean isV2CheckpointFile(String fileName) {
return V2_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
public static boolean isV2CheckpointFile(Path filePath) {
return V2_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches();
}

public static boolean isCommitFile(String fileName) {
String filename = new Path(fileName).getName();
return DELTA_FILE_PATTERN.matcher(filename).matches()
|| UUID_DELTA_FILE_REGEX.matcher(filename).matches();
public static boolean isCommitFile(Path filePath) {
return DELTA_FILE_PATTERN.matcher(filePath.getName()).matches()
|| UUID_DELTA_FILE_REGEX.matcher(filePath.getName()).matches();
}

/**
Expand All @@ -157,9 +154,9 @@ public static boolean isCommitFile(String fileName) {
* upgrade.
*/
public static long getFileVersion(Path path) {
if (isCheckpointFile(path.getName())) {
if (isCheckpointFile(path)) {
return checkpointVersion(path);
} else if (isCommitFile(path.getName())) {
} else if (isCommitFile(path)) {
return deltaVersion(path);
// } else if (isChecksumFile(path)) {
// checksumVersion(path);
Expand Down
Loading
Loading