Skip to content

Commit

Permalink
Cover executeGenerate, executeRestore, optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Oct 12, 2023
1 parent 531f3ed commit f49e6b6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
20 changes: 11 additions & 9 deletions core/src/main/scala/io/delta/tables/DeltaOptimizeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.delta.tables

import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession
import org.apache.spark.sql.delta.commands.OptimizeTableCommand
import org.apache.spark.sql.delta.util.AnalysisHelper

Expand Down Expand Up @@ -74,15 +75,16 @@ class DeltaOptimizeBuilder private(
execute(attrs)
}

private def execute(zOrderBy: Seq[UnresolvedAttribute]): DataFrame = {
val tableId: TableIdentifier = sparkSession
.sessionState
.sqlParser
.parseTableIdentifier(tableIdentifier)
val optimize =
OptimizeTableCommand(None, Some(tableId), partitionFilter, options)(zOrderBy = zOrderBy)
toDataset(sparkSession, optimize)
}
private def execute(zOrderBy: Seq[UnresolvedAttribute]): DataFrame =
withActiveSession(sparkSession) {
val tableId: TableIdentifier = sparkSession
.sessionState
.sqlParser
.parseTableIdentifier(tableIdentifier)
val optimize =
OptimizeTableCommand(None, Some(tableId), partitionFilter, options)(zOrderBy = zOrderBy)
toDataset(sparkSession, optimize)
}
}

private[delta] object DeltaOptimizeBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
sparkSession.createDataFrame(history.getHistory(limit))
}

protected def executeGenerate(tblIdentifier: String, mode: String): Unit = {
val tableId: TableIdentifier = sparkSession
.sessionState
.sqlParser
.parseTableIdentifier(tblIdentifier)
val generate = DeltaGenerateCommand(mode, tableId, self.deltaLog.options)
toDataset(sparkSession, generate)
}
protected def executeGenerate(tblIdentifier: String, mode: String): Unit =
withActiveSession(sparkSession) {
val tableId: TableIdentifier = sparkSession
.sessionState
.sqlParser
.parseTableIdentifier(tblIdentifier)
val generate = DeltaGenerateCommand(mode, tableId, self.deltaLog.options)
toDataset(sparkSession, generate)
}

protected def executeUpdate(
set: Map[String, Column],
Expand All @@ -87,7 +88,7 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
protected def executeRestore(
table: DeltaTableV2,
versionAsOf: Option[Long],
timestampAsOf: Option[String]): DataFrame = {
timestampAsOf: Option[String]): DataFrame = withActiveSession(sparkSession) {
val identifier = table.getTableIdentifierIfExists.map(
id => Identifier.of(id.database.toArray, id.table))
val sourceRelation = DataSourceV2Relation.create(table, None, identifier)
Expand Down

0 comments on commit f49e6b6

Please sign in to comment.