Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/operations/dml/.pages
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ nav:
- insert-into.md
- update.md
- delete.md
- merge-into.md
- add-columns.md
- update-columns.md
68 changes: 68 additions & 0 deletions docs/src/operations/dml/merge-into.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# MERGE INTO

Currently, merge into only supports for Spark 3.5+.

Lance fully supports Spark's MERGE INTO operation. For specific usage, please refer to Spark's relevant documentation.

```
MERGE INTO customers c
USING new_updates u
ON c.id = u.id
WHEN MATCHED THEN
UPDATE SET c.status = u.new_status, c.last_seen = u.timestamp;
```

Additionally, `lance-spark` introduces a column rewrite mode for `MERGE INTO` operation, which can significantly improve performance for narrow updates that only affect a few columns.

## Column Rewrite Mode

This mode allows the Lance data source to perform column-level updates by writing new data files for only the modified columns, avoiding the need to rewrite the entire data file (the "delete and insert" pattern).

!!! warning "Spark Extension Required"
This feature requires the Lance Spark SQL extension to be enabled.
See [Spark SQL Extensions](../../config.md#spark-sql-extensions) for configuration details.

### Configuration

You can enable or disable this feature using the Spark SQL session configuration `spark.sql.lance.rewrite_columns`.

**Using SQL:**

To enable the feature for the current session:
```sql
SET spark.sql.lance.rewrite_columns = true;
```

To disable it:
```sql
SET spark.sql.lance.rewrite_columns = false;
```

### Behavior and Semantics

- When `spark.sql.lance.rewrite_columns` is set to `true`, `MERGE INTO ... WHEN MATCHED UPDATE` operation will attempt to perform column-level updates. Instead of deleting the matched rows and inserting new versions, the engine will only write new versions of the changed columns.
- When the configuration is set to `false` (the default behavior), the operation fall back to rewriting the affected rows (a "delete and insert" operation).

### Examples

**MERGE INTO with RewriteColumns**

When enabled, the `UPDATE` clause in a `MERGE INTO` statement will benefit from this optimization.

```sql
-- Enable column rewrite mode
SET spark.sql.lance.rewrite_columns = true;

-- This will update the 'status' and 'last_seen' columns without rewriting the whole row
MERGE INTO customers c
USING new_updates u
ON c.id = u.id
WHEN MATCHED THEN
UPDATE SET c.status = u.new_status, c.last_seen = u.timestamp;
```

### Notes

- **Spark Version**: `MERGE INTO` operation are supported on Spark 3.5 and newer.
- **Nested Fields**: Updating nested fields follows the existing semantics of `MERGE INTO`. The entire top-level column containing the nested field will be rewritten.
- **Troubleshooting**: If you encounter any issues or unexpected behavior with this feature, you can disable it by setting `spark.sql.lance.rewrite_columns` to `false` to revert to the row-rewrite behavior.
53 changes: 53 additions & 0 deletions docs/src/operations/dml/update.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,56 @@ SET tags = ARRAY('ios', 'mobile')
WHERE event_id = 1001;
```

## Column Rewrite Mode

`lance-spark` introduces a column rewrite mode for `UPDATE` operations, which can significantly improve performance for narrow updates that only affect a few columns.

This mode allows the Lance data source to perform column-level updates by writing new data files for only the modified columns, avoiding the need to rewrite the entire data file (the "delete and insert" pattern).

!!! warning "Spark Extension Required"
This feature requires the Lance Spark SQL extension to be enabled.
See [Spark SQL Extensions](../../config.md#spark-sql-extensions) for configuration details.

### Configuration

You can enable or disable this feature using the Spark SQL session configuration `spark.sql.lance.rewrite_columns`.

**Using SQL:**

To enable the feature for the current session:
```sql
SET spark.sql.lance.rewrite_columns = true;
```

To disable it:
```sql
SET spark.sql.lance.rewrite_columns = false;
```

### Behavior and Semantics

- When `spark.sql.lance.rewrite_columns` is set to `true`, `UPDATE` operations will attempt to perform column-level updates. Instead of deleting the matched rows and inserting new versions, the engine will only write new versions of the changed columns.
- When the configuration is set to `false` (the default behavior), the operations fall back to rewriting the affected rows (a "delete and insert" operation).

### Examples

**UPDATE with RewriteColumns**

Here is an example of enabling the mode and performing an `UPDATE`.

```sql
-- Enable column rewrite mode
SET spark.sql.lance.rewrite_columns = true;

-- Assume 'users' table has columns: id, name, address
-- This operation will only write new data for the 'name' column
UPDATE users
SET name = 'New User Name'
WHERE id > 100;
```

### Notes

- **Spark Version**: `UPDATE` operations are supported on Spark 3.5 and newer.
- **Nested Fields**: Updating nested fields follows the existing semantics of `UPDATE`. The entire top-level column containing the nested field will be rewritten.
- **Troubleshooting**: If you encounter any issues or unexpected behavior with this feature, you can disable it by setting `spark.sql.lance.rewrite_columns` to `false` to revert to the row-rewrite behavior.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package org.lance.spark;

import org.lance.spark.read.LanceScanBuilder;
import org.lance.spark.utils.SparkUtil;
import org.lance.spark.write.SparkPositionDeltaWriteBuilder;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.ScanBuilder;
Expand All @@ -26,9 +28,11 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

import java.util.List;
import java.util.Map;

public class LancePositionDeltaOperation implements RowLevelOperation, SupportsDelta {

private final Command command;
private final StructType sparkSchema;
private final LanceSparkReadOptions readOptions;
Expand All @@ -44,6 +48,8 @@ public class LancePositionDeltaOperation implements RowLevelOperation, SupportsD

private final Map<String, String> namespaceProperties;

private List<String> updatedColumns;

public LancePositionDeltaOperation(
Command command,
StructType sparkSchema,
Expand Down Expand Up @@ -82,6 +88,7 @@ public DeltaWriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
.build();
return new SparkPositionDeltaWriteBuilder(
sparkSchema,
updatedColumns,
writeOptions,
initialStorageOptions,
namespaceImpl,
Expand All @@ -103,6 +110,10 @@ public NamedReference[] requiredMetadataAttributes() {

@Override
public boolean representUpdateAsDeleteAndInsert() {
return true;
return !SparkUtil.rewriteColumns(SparkSession.active());
}

public void setUpdatedColumns(List<String> updatedColumns) {
this.updatedColumns = updatedColumns;
}
}
Loading
Loading