diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java index 099e56d2b6a..b38573f0633 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java @@ -16,7 +16,6 @@ package io.delta.kernel.internal; import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; -import static io.delta.kernel.internal.fs.Path.getName; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.TableNotFoundException; @@ -92,11 +91,11 @@ public static long getEarliestRecreatableCommit(Engine engine, Path logPath) throws TableNotFoundException { try (CloseableIterator files = listFrom(engine, logPath, 0) - .filter( - fs -> - FileNames.isCommitFile(getName(fs.getPath())) - || FileNames.isCheckpointFile(getName(fs.getPath())))) { - + .filter(fs -> { + final Path path = new Path(fs.getPath()); + return FileNames.isCommitFile(path) + || FileNames.isCheckpointFile(path); + })) { if (!files.hasNext()) { // listFrom already throws an error if the directory is truly empty, thus this must // be because no files are checkpoint or delta files @@ -114,8 +113,8 @@ public static long getEarliestRecreatableCommit(Engine engine, Path logPath) // remember it and return it once we detect that we've seen a smaller or equal delta // version. while (files.hasNext()) { - String nextFilePath = files.next().getPath(); - if (FileNames.isCommitFile(getName(nextFilePath))) { + final Path nextFilePath = new Path(files.next().getPath()); + if (FileNames.isCommitFile(nextFilePath)) { long version = FileNames.deltaVersion(nextFilePath); if (version == 0L) { return version; @@ -200,7 +199,7 @@ private static List getCommits(Engine engine, Path logPath, long start) throws TableNotFoundException { CloseableIterator commits = listFrom(engine, logPath, start) - .filter(fs -> FileNames.isCommitFile(getName(fs.getPath()))) + .filter(fs -> FileNames.isCommitFile(new Path(fs.getPath()))) .map(fs -> new Commit(FileNames.deltaVersion(fs.getPath()), fs.getModificationTime())); return monotonizeCommitTimestamps(commits); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/CheckpointInstance.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/CheckpointInstance.java index d189bf2021c..0284a20b457 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/CheckpointInstance.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/CheckpointInstance.java @@ -51,11 +51,11 @@ public boolean usesSidecars() { public final Optional filePath; // Guaranteed to be present for V2 checkpoints. - public CheckpointInstance(String path) { + public CheckpointInstance(Path path) { Preconditions.checkArgument( FileNames.isCheckpointFile(path), "not a valid checkpoint file name"); - String[] pathParts = getPathName(path).split("\\."); + String[] pathParts = path.getName().split("\\."); if (pathParts.length == 3 && pathParts[2].equals("parquet")) { // Classic checkpoint 00000000000000000010.checkpoint.parquet @@ -75,9 +75,9 @@ public CheckpointInstance(String path) { this.version = Long.parseLong(pathParts[0]); this.numParts = Optional.empty(); this.format = CheckpointFormat.V2; - this.filePath = Optional.of(new Path(path)); + this.filePath = Optional.of(path); } else { - throw new RuntimeException("Unrecognized checkpoint path format: " + getPathName(path)); + throw new RuntimeException("Unrecognized checkpoint path format: " + path.getName()); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java index 4f09b706a59..52d99cd4f80 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java @@ -111,13 +111,13 @@ public static Optional findLastCompleteCheckpointBefore( List checkpoints = new ArrayList<>(); while (deltaLogFileIter.hasNext()) { FileStatus fileStatus = deltaLogFileIter.next(); - String fileName = new Path(fileStatus.getPath()).getName(); + Path filePath = new Path(fileStatus.getPath()); long currentFileVersion; - if (FileNames.isCommitFile(fileName)) { - currentFileVersion = FileNames.deltaVersion(fileName); - } else if (FileNames.isCheckpointFile(fileName)) { - currentFileVersion = FileNames.checkpointVersion(fileName); + if (FileNames.isCommitFile(filePath)) { + currentFileVersion = FileNames.deltaVersion(filePath); + } else if (FileNames.isCheckpointFile(filePath)) { + currentFileVersion = FileNames.checkpointVersion(filePath); } else { // allow all other types of files. currentFileVersion = currentVersion; @@ -133,7 +133,7 @@ public static Optional findLastCompleteCheckpointBefore( break; } if (validCheckpointFile(fileStatus)) { - checkpoints.add(new CheckpointInstance(fileStatus.getPath())); + checkpoints.add(new CheckpointInstance(filePath)); } numberOfFilesSearched++; } @@ -162,7 +162,7 @@ public static Optional findLastCompleteCheckpointBefore( } private static boolean validCheckpointFile(FileStatus fileStatus) { - return FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName()) + return FileNames.isCheckpointFile(new Path(fileStatus.getPath())) && fileStatus.getSize() > 0; } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/fs/Path.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/fs/Path.java index ee70bdd47cd..eab603e21f3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/fs/Path.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/fs/Path.java @@ -529,8 +529,4 @@ public void validateObject() throws InvalidObjectException { throw new InvalidObjectException("No URI in deserialized Path"); } } - - public static String getName(String pathString) { - return new Path(pathString).getName(); - } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java index bfb51ff5402..c1113e9d847 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java @@ -263,7 +263,6 @@ public ColumnarBatch extractSidecarsFromBatch( // Sidecars will exist in schema. Extract sidecar files, then remove sidecar files from // batch output. - List outputFiles = new ArrayList<>(); int sidecarIndex = columnarBatch.getSchema().fieldNames().indexOf(LogReplay.SIDECAR_FIELD_NAME); ColumnVector sidecarVector = columnarBatch.getColumnVector(sidecarIndex); for (int i = 0; i < columnarBatch.getSize(); i++) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java index d4d352abd23..135c0457e11 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java @@ -30,6 +30,7 @@ import io.delta.kernel.internal.*; import io.delta.kernel.internal.actions.CommitInfo; import io.delta.kernel.internal.actions.SetTransaction; +import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; @@ -238,7 +239,7 @@ private List getWinningCommitFiles(Engine engine) { List winningCommitFiles = new ArrayList<>(); while (files.hasNext()) { FileStatus file = files.next(); - if (FileNames.isCommitFile(file.getPath())) { + if (FileNames.isCommitFile(new Path(file.getPath()))) { winningCommitFiles.add(file); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/DeltaLogFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/DeltaLogFile.java index 5e0d77e63c9..7d22276f1b3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/DeltaLogFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/DeltaLogFile.java @@ -34,21 +34,21 @@ public enum LogType { } public static DeltaLogFile forCommitOrCheckpoint(FileStatus file) { - String fileName = new Path(file.getPath()).getName(); - LogType logType = null; - long version = -1; - if (FileNames.isCommitFile(fileName)) { + Path filePath = new Path(file.getPath()); + LogType logType; + long version; + if (FileNames.isCommitFile(filePath)) { logType = LogType.COMMIT; - version = FileNames.deltaVersion(fileName); - } else if (FileNames.isClassicCheckpointFile(fileName)) { + version = FileNames.deltaVersion(filePath); + } else if (FileNames.isClassicCheckpointFile(filePath)) { logType = LogType.CHECKPOINT_CLASSIC; - version = FileNames.checkpointVersion(fileName); - } else if (FileNames.isMulitPartCheckpointFile(fileName)) { + version = FileNames.checkpointVersion(filePath); + } else if (FileNames.isMultiPartCheckpointFile(filePath)) { logType = LogType.MULTIPART_CHECKPOINT; - version = FileNames.checkpointVersion(fileName); - } else if (FileNames.isV2CheckpointFile(fileName)) { + version = FileNames.checkpointVersion(filePath); + } else if (FileNames.isV2CheckpointFile(filePath)) { logType = LogType.V2_CHECKPOINT_MANIFEST; - version = FileNames.checkpointVersion(fileName); + version = FileNames.checkpointVersion(filePath); } else { throw new IllegalArgumentException( "File is not a commit or checkpoint file: " + file.getPath()); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index a8c22bc8b5d..245d16cd7de 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -19,7 +19,6 @@ import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable; import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore; -import static io.delta.kernel.internal.fs.Path.getName; import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.lang.String.format; @@ -221,7 +220,8 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio Checkpointer checkpointer = new Checkpointer(logPath); checkpointer.writeLastCheckpointFile(engine, checkpointMetaData); - logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version); + logger.info("{}: Last checkpoint metadata file is written for version: {}", + tablePath, version); logger.info("{}: Finished checkpoint for version: {}", tablePath, version); } @@ -250,7 +250,8 @@ private CloseableIterator listFrom(Engine engine, long startVersion) throws IOException { logger.debug("{}: startVersion: {}", tablePath, startVersion); return wrapEngineExceptionThrowsIO( - () -> engine.getFileSystemClient().listFrom(FileNames.listingPrefix(logPath, startVersion)), + () -> engine.getFileSystemClient() + .listFrom(FileNames.listingPrefix(logPath, startVersion)), "Listing from %s", FileNames.listingPrefix(logPath, startVersion)); } @@ -260,18 +261,19 @@ private CloseableIterator listFrom(Engine engine, long startVersion) * file (e.g., 000000000.json), or checkpoint file. (e.g., * 000000001.checkpoint.00001.00003.parquet) * - * @param fileName Name of the file (not the full path) + * @param filePath Full path of the file * @return Boolean Whether the file is delta log files */ - private boolean isDeltaCommitOrCheckpointFile(String fileName) { - return FileNames.isCheckpointFile(fileName) || FileNames.isCommitFile(fileName); + private boolean isDeltaCommitOrCheckpointFile(Path filePath) { + return FileNames.isCheckpointFile(filePath) || FileNames.isCommitFile(filePath); } /** * Returns an iterator containing a list of files found in the _delta_log directory starting with * the startVersion. Returns None if no files are found or the directory is missing. */ - private Optional> listFromOrNone(Engine engine, long startVersion) { + private Optional> listFromOrNone( + Engine engine, long startVersion) { // LIST the directory, starting from the provided lower bound (treat missing dir as empty). // NOTE: "empty/missing" is _NOT_ equivalent to "contains no useful commit files." try { @@ -325,10 +327,11 @@ protected final Optional> listDeltaAndCheckpointFiles( Optional versionToLoad, Optional tableCommitHandlerOpt) { versionToLoad.ifPresent( - v -> - checkArgument( - v >= startVersion, - format("versionToLoad=%s provided is less than startVersion=%s", v, startVersion))); + v -> checkArgument( + v >= startVersion, + format("versionToLoad=%s provided is less than startVersion=%s", v, startVersion) + ) + ); logger.debug( "startVersion: {}, versionToLoad: {}, coordinated commits enabled: {}", startVersion, @@ -350,16 +353,16 @@ protected final Optional> listDeltaAndCheckpointFiles( while (fileStatusesIter.hasNext()) { final FileStatus fileStatus = fileStatusesIter.next(); - final String fileName = getName(fileStatus.getPath()); + final Path filePath = new Path(fileStatus.getPath()); // Pick up all checkpoint and delta files - if (!isDeltaCommitOrCheckpointFile(fileName)) { + if (!isDeltaCommitOrCheckpointFile(filePath)) { continue; } // Checkpoint files of 0 size are invalid but may be ignored silently when read, // hence we drop them so that we never pick up such checkpoints. - if (FileNames.isCheckpointFile(fileName) && fileStatus.getSize() == 0) { + if (FileNames.isCheckpointFile(filePath) && fileStatus.getSize() == 0) { continue; } // Take files until the version we want to load @@ -385,10 +388,11 @@ protected final Optional> listDeltaAndCheckpointFiles( // files and so maxDeltaVersionSeen should be equal to fileVersion. // But we are being defensive here and taking max of all the // fileVersions seen. - if (FileNames.isCommitFile(fileName)) { + if (FileNames.isCommitFile(filePath)) { maxDeltaVersionSeen.set( Math.max( - maxDeltaVersionSeen.get(), FileNames.deltaVersion(fileStatus.getPath()))); + maxDeltaVersionSeen.get(), + FileNames.deltaVersion(fileStatus.getPath()))); } output.add(fileStatus); } @@ -551,8 +555,8 @@ private Optional getLogSegmentFrom( * ensure that the delta files are contiguous. * * @param startCheckpoint A potential start version to perform the listing of the DeltaLog, - * typically that of a known checkpoint. If this version's not provided, we will start listing - * from version 0. + * typically that of a known checkpoint. If this version's not provided, we will start + * listing from version 0. * @param versionToLoad A specific version we try to load, but we may only load a version before * this version if this version of commit is un-backfilled. Typically used with time travel * and the Delta streaming source. If not provided, we will try to load the latest version of @@ -662,7 +666,7 @@ protected Optional getLogSegmentAtOrBeforeVersion( Tuple2, List> checkpointsAndDeltas = ListUtils.partition( newFiles, - fileStatus -> FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName())); + fileStatus -> FileNames.isCheckpointFile(new Path(fileStatus.getPath()))); final List checkpoints = checkpointsAndDeltas._1; final List deltas = checkpointsAndDeltas._2; @@ -682,7 +686,7 @@ protected Optional getLogSegmentAtOrBeforeVersion( final List checkpointFiles = checkpoints.stream() - .map(f -> new CheckpointInstance(f.getPath())) + .map(f -> new CheckpointInstance(new Path(f.getPath()))) .collect(Collectors.toList()); logDebug(() -> format("checkpointFiles: %s", Arrays.toString(checkpointFiles.toArray()))); @@ -755,7 +759,8 @@ protected Optional getLogSegmentAtOrBeforeVersion( .collect(Collectors.toCollection(LinkedList::new)); logDebug( - () -> format("deltaVersions: %s", Arrays.toString(deltaVersionsAfterCheckpoint.toArray()))); + () -> + format("deltaVersions: %s", Arrays.toString(deltaVersionsAfterCheckpoint.toArray()))); final long newVersion = deltaVersionsAfterCheckpoint.isEmpty() diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java index 93c554c9e8f..f87e8c89d68 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java @@ -54,10 +54,9 @@ public static long deltaVersion(Path path) { return Long.parseLong(path.getName().split("\\.")[0]); } - public static long deltaVersion(String path) { - final int slashIdx = path.lastIndexOf(Path.SEPARATOR); - final String name = path.substring(slashIdx + 1); - return Long.parseLong(name.split("\\.")[0]); + public static long deltaVersion(String pathString) { + final Path path = new Path(pathString); + return deltaVersion(path); } /** Returns the version for the given checkpoint path. */ @@ -65,10 +64,9 @@ public static long checkpointVersion(Path path) { return Long.parseLong(path.getName().split("\\.")[0]); } - public static long checkpointVersion(String path) { - final int slashIdx = path.lastIndexOf(Path.SEPARATOR); - final String name = path.substring(slashIdx + 1); - return Long.parseLong(name.split("\\.")[0]); + public static long checkpointVersion(String pathString) { + final Path path = new Path(pathString); + return checkpointVersion(path); } public static String sidecarFile(Path path, String sidecar) { @@ -128,26 +126,25 @@ public static List checkpointFileWithParts(Path path, long version, int nu return output; } - public static boolean isCheckpointFile(String fileName) { - return CHECKPOINT_FILE_PATTERN.matcher(new Path(fileName).getName()).matches(); + public static boolean isCheckpointFile(Path filePath) { + return CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches(); } - public static boolean isClassicCheckpointFile(String fileName) { - return CLASSIC_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches(); + public static boolean isClassicCheckpointFile(Path filePath) { + return CLASSIC_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches(); } - public static boolean isMulitPartCheckpointFile(String fileName) { - return MULTI_PART_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches(); + public static boolean isMultiPartCheckpointFile(Path filePath) { + return MULTI_PART_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches(); } - public static boolean isV2CheckpointFile(String fileName) { - return V2_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches(); + public static boolean isV2CheckpointFile(Path filePath) { + return V2_CHECKPOINT_FILE_PATTERN.matcher(filePath.getName()).matches(); } - public static boolean isCommitFile(String fileName) { - String filename = new Path(fileName).getName(); - return DELTA_FILE_PATTERN.matcher(filename).matches() - || UUID_DELTA_FILE_REGEX.matcher(filename).matches(); + public static boolean isCommitFile(Path filePath) { + return DELTA_FILE_PATTERN.matcher(filePath.getName()).matches() + || UUID_DELTA_FILE_REGEX.matcher(filePath.getName()).matches(); } /** @@ -157,9 +154,9 @@ public static boolean isCommitFile(String fileName) { * upgrade. */ public static long getFileVersion(Path path) { - if (isCheckpointFile(path.getName())) { + if (isCheckpointFile(path)) { return checkpointVersion(path); - } else if (isCommitFile(path.getName())) { + } else if (isCommitFile(path)) { return deltaVersion(path); // } else if (isChecksumFile(path)) { // checksumVersion(path); diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala index aa38cda8409..9e42c0a4d76 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala @@ -212,7 +212,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { FileNames.checkpointVersion(topLevelFile.getPath) == v } if (matchingCheckpoints.nonEmpty) { - matchingCheckpoints.maxBy(f => new CheckpointInstance(f._1.getPath)) match { + matchingCheckpoints.maxBy(f => new CheckpointInstance(new Path(f._1.getPath))) match { case (c, sidecars) => (Seq(c), sidecars) } } else {