Skip to content

Commit

Permalink
Add informative exception message for deduplication (#471)
Browse files Browse the repository at this point in the history
* Add informative error exception messages

* Add informative error exception messages
  • Loading branch information
ismailsimsek authored Jan 8, 2025
1 parent 197f758 commit 316e657
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import static io.debezium.server.iceberg.IcebergChangeConsumer.keyDeserializer;
import static io.debezium.server.iceberg.IcebergChangeConsumer.valDeserializer;
Expand Down Expand Up @@ -71,7 +76,17 @@ public JsonNode value() {
}

public Long cdcSourceTsMsValue(String cdcSourceTsMsField) {
return value().get(cdcSourceTsMsField).asLong(0);

final JsonNode element = value().get(cdcSourceTsMsField);
if (element == null) {
throw new DebeziumException("Field '" + cdcSourceTsMsField + "' not found in JSON object: " + value());
}

try {
return element.asLong();
} catch (NumberFormatException e) {
throw new DebeziumException("Error converting field '" + cdcSourceTsMsField + "' value '" + element + "' to Long: " + e.getMessage(), e);
}
}

public Operation cdcOpValue(String cdcOpField) {
Expand Down Expand Up @@ -100,7 +115,7 @@ && value().has("tableChanges")) {
return Operation.READ;
} else if (opFieldValue.equals("c")) {
return Operation.INSERT;
}else if (opFieldValue.equals("i")) {
} else if (opFieldValue.equals("i")) {
return Operation.INSERT;
}
throw new DebeziumException("Unexpected `" + cdcOpField + "=" + opFieldValue + "` operation value received, expecting one of ['u','d','r','c', 'i']");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import io.debezium.server.iceberg.RecordConverter;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.iceberg.*;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.WriteResult;
Expand Down Expand Up @@ -63,14 +67,18 @@ protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {
throw new DebeziumException("Cannot deduplicate data with null key! destination:'" + e.destination() + "' event: '" + e.value().toString() + "'");
}

// deduplicate using key(PK)
deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> {
if (this.compareByTsThenOp(oldValue, newValue) <= 0) {
return newValue;
} else {
return oldValue;
}
});
try {
// deduplicate using key(PK)
deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> {
if (this.compareByTsThenOp(oldValue, newValue) <= 0) {
return newValue;
} else {
return oldValue;
}
});
} catch (Exception ex) {
throw new DebeziumException("Failed to deduplicate events", ex);
}
}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;

import java.time.Duration;
import java.util.List;
Expand All @@ -34,6 +35,7 @@
@QuarkusTestResource(value = CatalogRest.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@DisabledIfEnvironmentVariable(named = "GITHUB_ACTIONS", matches = "true")
public class IcebergChangeConsumerRestCatalogTest extends BaseTest {

@Test
Expand Down

0 comments on commit 316e657

Please sign in to comment.