feat: support update/merge-into using RewriteColumns mode#199
feat: support update/merge-into using RewriteColumns mode#199fangbo wants to merge 3 commits intolance-format:mainfrom
Conversation
|
@jackye1995 @jiaoew1991 @hamersaw Do you think this approach is reasonable ? |
6799b3a to
9e01904
Compare
| } | ||
|
|
||
| } catch { | ||
| case _: Exception => |
There was a problem hiding this comment.
this should not just fail silently
There was a problem hiding this comment.
Sorry it's my mistake. I have made some modification and throw a RuntimeException to inform users to set spark.sql.lance.rewrite_columns to false to disable this feature.
| "spark.sql.extensions", "org.lance.spark.extensions.LanceSparkSessionExtensions") | ||
| .config("spark.sql.catalog." + catalogName + ".impl", "dir") | ||
| .config("spark.sql.catalog." + catalogName + ".root", tempDir.toString()) | ||
| .config("spark.sql.catalog." + catalogName + ".storage.rewrite_columns", "false") |
There was a problem hiding this comment.
seems strange that we are making this a storage option. I think this should be more like a Spark SQL conf, so we can do something like SET to enable/disable it if necessary.
There was a problem hiding this comment.
Good suggestion ! I have defined a key spark.sql.lance.rewrite_columns in SparkUtil.REWRITE_COLUMNS. The method SparkUtil#rewriteColumns check the rewrite mode from spark session configuration.
| writer.writeBatch(); | ||
| writer.end(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Cannot write schema root", e); |
There was a problem hiding this comment.
I don't think you need to rethrow as RuntimeException? The method already does throws IOException
| private final boolean useQueuedWriteBuffer; | ||
| private final int queueDepth; | ||
| private final int batchSize; | ||
| private final boolean rewriteColumns; |
There was a problem hiding this comment.
looks like this is added but never used?
| } | ||
|
|
||
| public void setUpdatedColumns(List<String> updatedColumns) { | ||
| LOG.info("Set updated columns: {}", updatedColumns); |
There was a problem hiding this comment.
this feels like a debug message, or could just be removed?
| private final Set<Long> fieldsModified = new HashSet<>(); | ||
| private final Map<Integer, FragmentMetadata> updatedFragments = new HashMap<>(); | ||
|
|
||
| private int currentUpdateFragmentId = -1; |
There was a problem hiding this comment.
can we use Optional instead to be clear?
jackye1995
left a comment
There was a problem hiding this comment.
mostly looks good to me, could you also update the related documentation?
@jackye1995 Thanks for your review. I have fixed the comments and added documentation in update.md |
docs/src/operations/dml/update.md
Outdated
|
|
||
| ## Column Rewrite Mode | ||
|
|
||
| `lance-spark` introduces a column rewrite mode for `UPDATE` and `MERGE INTO` operations, which can significantly improve performance for narrow updates that only affect a few columns. |
There was a problem hiding this comment.
I think you should also update the MERGE doc? Currently the update doc talks about both update and merge.
There was a problem hiding this comment.
I think you should also update the MERGE doc? Currently the update doc talks about both update and merge.
Thanks for your advice. I have added a new documentation merge-into.md
4d62dc0 to
4ceb03b
Compare
|
@jackye1995 I have rebased this PR and fixed the issues you commented. Could you please review it again? Thanks a lot. |
6f9677d to
e4fe695
Compare
a963cae to
ececb75
Compare
This is for #166
rewrite_columnsto specify update/merge-into to use RewriteColumns mode. If the parameter value is false RewriteRows mode is used which means that the rows are deleted and new updated rows are inserted.updateormerge into. The specific updated columns will be injected to LancePositionDeltaOperation.rewrite_columnsis true, LancePositionDeltaOperation.representUpdateAsDeleteAndInsert return false. It means that LanceDeltaWriter.update will be invoked for columns updating.