From 0afde444ce11eb20c12252b89aded3bef4370818 Mon Sep 17 00:00:00 2001 From: Charlene Lyu Date: Tue, 20 Aug 2024 10:01:15 -0700 Subject: [PATCH] [Sharing] Add timestamp_ntz to Delta Sharing reader feature header (#3579) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (Sharing) ## Description Add timestamp_ntz to Delta Sharing reader feature header. ## How was this patch tested? Unit test ## Does this PR introduce _any_ user-facing changes? --- .../sharing/spark/DeltaSharingUtils.scala | 15 +++- .../spark/DeltaFormatSharingSourceSuite.scala | 68 +++++++++++++++++++ .../DeltaSharingDataSourceDeltaSuite.scala | 37 ++++++++++ .../TestClientForDeltaFormatSharing.scala | 8 ++- 4 files changed, 123 insertions(+), 5 deletions(-) diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala index 0bed514d973..4e9e3360d72 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala @@ -27,7 +27,8 @@ import org.apache.spark.sql.delta.{ DeletionVectorsTableFeature, DeltaLog, DeltaParquetFileFormat, - SnapshotDescriptor + SnapshotDescriptor, + TimestampNTZTableFeature } import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import com.google.common.hash.Hashing @@ -45,9 +46,17 @@ import org.apache.spark.storage.{BlockId, StorageLevel} object DeltaSharingUtils extends Logging { val STREAMING_SUPPORTED_READER_FEATURES: Seq[String] = - Seq(DeletionVectorsTableFeature.name, ColumnMappingTableFeature.name) + Seq( + DeletionVectorsTableFeature.name, + ColumnMappingTableFeature.name, + TimestampNTZTableFeature.name + ) val SUPPORTED_READER_FEATURES: Seq[String] = - Seq(DeletionVectorsTableFeature.name, ColumnMappingTableFeature.name) + Seq( + DeletionVectorsTableFeature.name, + ColumnMappingTableFeature.name, + TimestampNTZTableFeature.name + ) // The prefix will be used for block ids of all blocks that store the delta log in BlockManager. // It's used to ensure delta sharing queries don't mess up with blocks with other applications. diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala index 7539a3f8a67..1c8fa12f872 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala @@ -16,6 +16,8 @@ package io.delta.sharing.spark +import java.time.LocalDateTime + import org.apache.spark.sql.delta.DeltaIllegalStateException import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.DeltaOptions.{ @@ -692,6 +694,72 @@ class DeltaFormatSharingSourceSuite } } + test("streaming works with timestampNTZ") { + withTempDir { tempDir => + val deltaTableName = "delta_table_timestampNTZ" + withTable(deltaTableName) { + sql(s"CREATE TABLE $deltaTableName(c1 TIMESTAMP_NTZ) USING DELTA") + val sharedTableName = "shared_table_timestampNTZ" + prepareMockedClientMetadata(deltaTableName, sharedTableName) + val profileFile = prepareProfileFile(tempDir) + val tablePath = profileFile.getCanonicalPath + s"#share1.default.$sharedTableName" + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + def InsertToDeltaTable(values: String): Unit = { + sql(s"INSERT INTO $deltaTableName VALUES $values") + } + + def processAllAvailableInStream( + sourceOptions: Map[String, String], + expectations: StreamAction*): Unit = { + val df = spark.readStream + .format("deltaSharing") + .options(sourceOptions) + .load(tablePath) + .select("c1") + + val base = Seq(StartStream(), ProcessAllAvailable()) + testStream(df)((base ++ expectations): _*) + } + + // Insert at version 1. + InsertToDeltaTable("""('2022-01-01 02:03:04.123456')""") + // Insert at version 2. + InsertToDeltaTable("""('2022-02-02 03:04:05.123456')""") + + prepareMockedClientAndFileSystemResult( + deltaTable = deltaTableName, + sharedTable = sharedTableName, + versionAsOf = Some(2L) + ) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + processAllAvailableInStream( + Map("responseFormat" -> "delta"), + CheckAnswer( + LocalDateTime.parse("2022-01-01T02:03:04.123456"), + LocalDateTime.parse("2022-02-02T03:04:05.123456") + ) + ) + + prepareMockedClientAndFileSystemResultForStreaming( + deltaTableName, + sharedTableName, + startingVersion = 2, + endingVersion = 2 + ) + processAllAvailableInStream( + Map( + "responseFormat" -> "delta", + "startingVersion" -> "2" + ), + CheckAnswer(LocalDateTime.parse("2022-02-02T03:04:05.123456")) + ) + assertBlocksAreCleanedUp() + } + } + } + } + test("startingVersion works") { withTempDirs { (inputDir, outputDir, checkpointDir) => val deltaTableName = "delta_table_startVersion" diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala index fd96e7d266d..2dd93c22aec 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.types.{ LongType, StringType, StructType, + TimestampNTZType, TimestampType } @@ -1429,6 +1430,42 @@ trait DeltaSharingDataSourceDeltaSuiteBase } } } + + test("DeltaSharingDataSource able to read timestampNTZ table") { + withTempDir { tempDir => + val deltaTableName = "delta_table_timestampNTZ" + withTable(deltaTableName) { + sql(s"CREATE TABLE $deltaTableName(c1 TIMESTAMP_NTZ) USING DELTA") + sql(s"""INSERT INTO $deltaTableName VALUES ('2022-01-02 03:04:05.123456')""") + + val sharedTableName = "shared_table_timestampNTZ" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + def testReadTimestampNTZ(tablePath: String): Unit = { + val expectedSchema: StructType = new StructType() + .add("c1", TimestampNTZType) + assert( + expectedSchema == spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .schema + ) + val sharingDf = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + val deltaDf = spark.read.format("delta").table(deltaTableName) + checkAnswer(sharingDf, deltaDf) + assert(sharingDf.count() > 0) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testReadTimestampNTZ(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } } class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala index 505be42fc04..af89751a226 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -61,8 +61,12 @@ private[spark] class TestClientForDeltaFormatSharing( assert( responseFormat == DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET || - (readerFeatures.contains("deletionVectors") && readerFeatures.contains("columnMapping")), - "deletionVectors and columnMapping should be supported in all types of queries." + ( + readerFeatures.contains("deletionVectors") && + readerFeatures.contains("columnMapping") && + readerFeatures.contains("timestampNtz") + ), + "deletionVectors, columnMapping, timestampNtz should be supported in all types of queries." ) import TestClientForDeltaFormatSharing._