Skip to content

Commit a2363e6

Browse files
authored
[spark] spark delete with deletion vector (apache#3390)
1 parent 46816e9 commit a2363e6

File tree

15 files changed

+569
-75
lines changed

15 files changed

+569
-75
lines changed

paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.DataInput;
2424
import java.io.DataOutput;
2525
import java.io.IOException;
26+
import java.util.Objects;
2627

2728
/** A compressed bitmap for 32-bit integer. */
2829
public class RoaringBitmap32 {
@@ -67,4 +68,16 @@ public void serialize(DataOutput out) throws IOException {
6768
public void deserialize(DataInput in) throws IOException {
6869
roaringBitmap.deserialize(in);
6970
}
71+
72+
@Override
73+
public boolean equals(Object o) {
74+
if (this == o) {
75+
return true;
76+
}
77+
if (o == null || getClass() != o.getClass()) {
78+
return false;
79+
}
80+
RoaringBitmap32 that = (RoaringBitmap32) o;
81+
return Objects.equals(this.roaringBitmap, that.roaringBitmap);
82+
}
7083
}

paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.DataInput;
2525
import java.io.DataOutputStream;
2626
import java.io.IOException;
27+
import java.util.Objects;
2728

2829
/**
2930
* A {@link DeletionVector} based on {@link RoaringBitmap32}, it only supports files with row count
@@ -104,4 +105,16 @@ private void checkPosition(long position) {
104105
"The file has too many rows, RoaringBitmap32 only supports files with row count not exceeding 2147483647.");
105106
}
106107
}
108+
109+
@Override
110+
public boolean equals(Object o) {
111+
if (this == o) {
112+
return true;
113+
}
114+
if (o == null || getClass() != o.getClass()) {
115+
return false;
116+
}
117+
BitmapDeletionVector that = (BitmapDeletionVector) o;
118+
return Objects.equals(this.roaringBitmap, that.roaringBitmap);
119+
}
107120
}

paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.data.BinaryRow;
2323
import org.apache.paimon.index.IndexFileHandler;
2424
import org.apache.paimon.index.IndexFileMeta;
25+
import org.apache.paimon.manifest.IndexManifestEntry;
2526

2627
import javax.annotation.Nullable;
2728

@@ -30,6 +31,7 @@
3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.Optional;
34+
import java.util.stream.Collectors;
3335

3436
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
3537

@@ -138,6 +140,20 @@ public DeletionVectorsMaintainer createOrRestore(
138140
return createOrRestore(deletionVectors);
139141
}
140142

143+
@VisibleForTesting
144+
public DeletionVectorsMaintainer createOrRestore(
145+
@Nullable Long snapshotId, BinaryRow partition) {
146+
List<IndexFileMeta> indexFiles =
147+
snapshotId == null
148+
? Collections.emptyList()
149+
: handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition).stream()
150+
.map(IndexManifestEntry::indexFile)
151+
.collect(Collectors.toList());
152+
Map<String, DeletionVector> deletionVectors =
153+
new HashMap<>(handler.readAllDeletionVectors(indexFiles));
154+
return createOrRestore(deletionVectors);
155+
}
156+
141157
public DeletionVectorsMaintainer create() {
142158
return createOrRestore(new HashMap<>());
143159
}

paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.apache.paimon.Snapshot;
2222
import org.apache.paimon.data.BinaryRow;
2323
import org.apache.paimon.deletionvectors.DeletionVector;
24+
import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
2425
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
2526
import org.apache.paimon.fs.Path;
2627
import org.apache.paimon.manifest.IndexManifestEntry;
2728
import org.apache.paimon.manifest.IndexManifestFile;
29+
import org.apache.paimon.table.source.DeletionFile;
2830
import org.apache.paimon.utils.IntIterator;
2931
import org.apache.paimon.utils.Pair;
3032
import org.apache.paimon.utils.PathFactory;
@@ -185,6 +187,11 @@ public void deleteManifest(String indexManifest) {
185187
indexManifestFile.delete(indexManifest);
186188
}
187189

190+
public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
191+
Map<String, DeletionFile> dataFileToDeletionFiles) {
192+
return new DeletionVectorIndexFileMaintainer(this, dataFileToDeletionFiles);
193+
}
194+
188195
public Map<String, DeletionVector> readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
189196
Map<String, DeletionVector> deletionVectors = new HashMap<>();
190197
for (IndexFileMeta indexFile : fileMetas) {

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public static void validateTableSchema(TableSchema schema) {
184184
field));
185185

186186
if (options.deletionVectorsEnabled()) {
187-
validateForDeletionVectors(schema, options);
187+
validateForDeletionVectors(options);
188188
}
189189
}
190190

@@ -461,11 +461,7 @@ private static void validateDefaultValues(TableSchema schema) {
461461
}
462462
}
463463

464-
private static void validateForDeletionVectors(TableSchema schema, CoreOptions options) {
465-
checkArgument(
466-
!schema.primaryKeys().isEmpty(),
467-
"Deletion vectors mode is only supported for tables with primary keys.");
468-
464+
private static void validateForDeletionVectors(CoreOptions options) {
469465
checkArgument(
470466
options.changelogProducer() == ChangelogProducer.NONE
471467
|| options.changelogProducer() == ChangelogProducer.LOOKUP,

paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import javax.annotation.Nullable;
2727

2828
import java.io.IOException;
29+
import java.io.Serializable;
2930
import java.util.ArrayList;
3031
import java.util.HashMap;
3132
import java.util.List;
@@ -44,7 +45,9 @@
4445
* </ul>
4546
*/
4647
@Public
47-
public class DeletionFile {
48+
public class DeletionFile implements Serializable {
49+
50+
private static final long serialVersionUID = 1L;
4851

4952
private final String path;
5053
private final long offset;

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,23 @@
1818

1919
package org.apache.paimon.spark
2020

21+
import org.apache.paimon.spark.schema.PaimonMetadataColumn
2122
import org.apache.paimon.table.Table
2223
import org.apache.paimon.table.source.{DataSplit, Split}
2324

2425
import org.apache.spark.sql.connector.read.{Batch, Scan}
2526
import org.apache.spark.sql.types.StructType
2627

2728
/** For internal use only. */
28-
case class PaimonSplitScan(table: Table, dataSplits: Array[DataSplit]) extends Scan {
29-
29+
case class PaimonSplitScan(
30+
table: Table,
31+
dataSplits: Array[DataSplit],
32+
metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty)
33+
extends Scan {
3034
override def readSchema(): StructType = SparkTypeUtils.fromPaimonRowType(table.rowType())
3135

3236
override def toBatch: Batch = {
33-
PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder)
37+
PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder, metadataColumns)
3438
}
3539

3640
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@ package org.apache.paimon.spark.commands
2020

2121
import org.apache.paimon.CoreOptions
2222
import org.apache.paimon.CoreOptions.MergeEngine
23-
import org.apache.paimon.spark.{InsertInto, SparkTable}
2423
import org.apache.paimon.spark.PaimonSplitScan
2524
import org.apache.paimon.spark.catalyst.Compatibility
2625
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
2726
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
2827
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
29-
import org.apache.paimon.table.FileStoreTable
28+
import org.apache.paimon.table.{BucketMode, FileStoreTable}
3029
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
3130
import org.apache.paimon.types.RowKind
3231
import org.apache.paimon.utils.RowDataPartitionComputer
@@ -56,7 +55,7 @@ case class DeleteFromPaimonTableCommand(
5655

5756
override def run(sparkSession: SparkSession): Seq[Row] = {
5857

59-
val commit = table.store.newCommit(UUID.randomUUID.toString)
58+
val commit = fileStore.newCommit(UUID.randomUUID.toString)
6059
if (condition == null || condition == TrueLiteral) {
6160
commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
6261
} else {
@@ -100,7 +99,7 @@ case class DeleteFromPaimonTableCommand(
10099
val commitMessages = if (usePrimaryKeyDelete()) {
101100
performPrimaryKeyDelete(sparkSession)
102101
} else {
103-
performDeleteCopyOnWrite(sparkSession)
102+
performNonPrimaryKeyDelete(sparkSession)
104103
}
105104
writer.commit(commitMessages)
106105
}
@@ -119,39 +118,70 @@ case class DeleteFromPaimonTableCommand(
119118
writer.write(df)
120119
}
121120

122-
def performDeleteCopyOnWrite(sparkSession: SparkSession): Seq[CommitMessage] = {
121+
def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
122+
val pathFactory = fileStore.pathFactory()
123123
// Step1: the candidate data splits which are filtered by Paimon Predicate.
124124
val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
125-
val fileNameToMeta = candidateFileMap(candidateDataSplits)
125+
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
126126

127-
// Step2: extract out the exactly files, which must have at least one record to be updated.
128-
val touchedFilePaths = findTouchedFiles(candidateDataSplits, condition, relation, sparkSession)
127+
if (deletionVectorsEnabled) {
128+
// Step2: collect all the deletion vectors that marks the deleted rows.
129+
val deletionVectors = collectDeletionVectors(
130+
candidateDataSplits,
131+
dataFilePathToMeta,
132+
condition,
133+
relation,
134+
sparkSession)
135+
136+
deletionVectors.cache()
137+
try {
138+
// Step3: write these deletion vectors.
139+
val newIndexCommitMsg = writer.persistDeletionVectors(deletionVectors)
140+
141+
// Step4: mark the touched index files as DELETE if needed.
142+
val rewriteIndexCommitMsg = fileStore.bucketMode() match {
143+
case BucketMode.BUCKET_UNAWARE =>
144+
val indexEntries = getDeletedIndexFiles(dataFilePathToMeta, deletionVectors)
145+
writer.buildCommitMessageFromIndexManifestEntry(indexEntries)
146+
case _ =>
147+
Seq.empty[CommitMessage]
148+
}
129149

130-
// Step3: the smallest range of data files that need to be rewritten.
131-
val touchedFiles = touchedFilePaths.map {
132-
file => fileNameToMeta.getOrElse(file, throw new RuntimeException(s"Missing file: $file"))
133-
}
150+
newIndexCommitMsg ++ rewriteIndexCommitMsg
151+
} finally {
152+
deletionVectors.unpersist()
153+
}
134154

135-
// Step4: build a dataframe that contains the unchanged data, and write out them.
136-
val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(
137-
touchedFiles,
138-
rawConvertible = true,
139-
table.store().pathFactory())
140-
val toRewriteScanRelation = Filter(
141-
Not(condition),
142-
Compatibility.createDataSourceV2ScanRelation(
143-
relation,
144-
PaimonSplitScan(table, touchedDataSplits),
145-
relation.output))
146-
val data = createDataset(sparkSession, toRewriteScanRelation)
155+
} else {
156+
// Step2: extract out the exactly files, which must have at least one record to be updated.
157+
val touchedFilePaths =
158+
findTouchedFiles(candidateDataSplits, condition, relation, sparkSession)
159+
160+
// Step3: the smallest range of data files that need to be rewritten.
161+
val touchedFiles = touchedFilePaths.map {
162+
file =>
163+
dataFilePathToMeta.getOrElse(file, throw new RuntimeException(s"Missing file: $file"))
164+
}
147165

148-
// only write new files, should have no compaction
149-
val addCommitMessage = writer.writeOnly().write(data)
166+
// Step4: build a dataframe that contains the unchanged data, and write out them.
167+
val touchedDataSplits =
168+
SparkDataFileMeta.convertToDataSplits(touchedFiles, rawConvertible = true, pathFactory)
169+
val toRewriteScanRelation = Filter(
170+
Not(condition),
171+
Compatibility.createDataSourceV2ScanRelation(
172+
relation,
173+
PaimonSplitScan(table, touchedDataSplits),
174+
relation.output))
175+
val data = createDataset(sparkSession, toRewriteScanRelation)
150176

151-
// Step5: convert the deleted files that need to be wrote to commit message.
152-
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
177+
// only write new files, should have no compaction
178+
val addCommitMessage = writer.writeOnly().write(data)
153179

154-
addCommitMessage ++ deletedCommitMessage
180+
// Step5: convert the deleted files that need to be wrote to commit message.
181+
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
182+
183+
addCommitMessage ++ deletedCommitMessage
184+
}
155185
}
156186

157187
}

0 commit comments

Comments
 (0)