Skip to content

Commit

Permalink
[Spark] Disable implicit casting in Delta streaming sink (#3691)
Browse files Browse the repository at this point in the history
## Description
#3443 introduced implicit casting
when writing to a Delta table using a streaming query.


We are disabling this change for now as it regresses behavior when a
struct field is missing in the input data. This previously succeeded,
filling the missing fields with `null` but would now fail with:
```
DELTA_UPDATE_SCHEMA_MISMATCH_EXPRESSION] Cannot cast struct<name:string> to struct<name:string,age:bigint>. All nested columns must match.
```

Note: batch INSERT fails in this scenario with:
```
[DELTA_INSERT_COLUMN_ARITY_MISMATCH] Cannot write to '<table>, not enough nested fields in <struct>; target table has 3 column(s) but the inserted data has 2 column(s)
```
but since streaming write allowed this, we have to preserve that
behavior.

## How was this patch tested?
Tests added as part of #3443, e.p.
with flag disabled.

## Does this PR introduce _any_ user-facing changes?
Disabled behavior change that was to be introduced with
#3443.
  • Loading branch information
johanl-db committed Sep 23, 2024
1 parent 1753cb5 commit a99f62b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1561,7 +1561,9 @@ trait DeltaSQLConfBase {
|The casting behavior is governed by 'spark.sql.storeAssignmentPolicy'.
|""".stripMargin)
.booleanConf
.createWithDefault(true)
// This feature doesn't properly support structs with missing fields and is disabled until a
// fix is implemented.
.createWithDefault(false)

val DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES =
buildConf("changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand All @@ -32,6 +32,12 @@ import org.apache.spark.sql.types._
*/
class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest {

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS.key, "true")
spark.conf.set(SQLConf.ANSI_ENABLED.key, "true")
}

for (schemaEvolution <- BOOLEAN_DOMAIN) {
testInserts("insert with implicit up and down cast on top-level fields, " +
s"schemaEvolution=$schemaEvolution")(
Expand Down

0 comments on commit a99f62b

Please sign in to comment.