Skip to content

Commit

Permalink
Add a check for error 71005
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Apr 4, 2024
1 parent 880b960 commit e5dcb4c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
15 changes: 12 additions & 3 deletions src/main/java/io/cdap/plugin/sfmc/sink/DataExtensionClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@
import com.exacttarget.fuelsdk.internal.UpdateRequest;
import com.exacttarget.fuelsdk.internal.UpdateResponse;
import com.exacttarget.fuelsdk.internal.UpdateResult;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Wrapper around an ETClient that understands objects at the level that the plugin cares about.
*/
public class DataExtensionClient {
private static final Set<Integer> PK_ERROR_CODES = ImmutableSet.of(2, 71005);
private final ETClient client;
private final String dataExtensionKey;

Expand Down Expand Up @@ -204,9 +207,7 @@ public List<ETResult<ETDataExtensionRow>> upsert(List<ETDataExtensionRow> rows)

List<ETDataExtensionRow> toUpdate = new ArrayList<>();
for (ETResult<ETDataExtensionRow> row : inserts.getResults()) {
// super hacky to check the error message... but there is no better way
if (row.getStatus() == ETResult.Status.ERROR && row.getErrorCode() == 2 && row.getErrorMessage() != null &&
row.getErrorMessage().toLowerCase().contains("primary key")) {
if (isPrimaryKeyError(row)) {
ETDataExtensionRow failed = row.getObject();
ETDataExtensionRow copy = new ETDataExtensionRow();
copy.setDataExtensionKey(failed.getDataExtensionKey());
Expand All @@ -226,6 +227,14 @@ public List<ETResult<ETDataExtensionRow>> upsert(List<ETDataExtensionRow> rows)
return result;
}

boolean isPrimaryKeyError(ETResult<ETDataExtensionRow> row) {
// super hacky to check the error message... but there is no better way
// error code 2 and 71005 are "Primary key violation"
return row.getStatus() == ETResult.Status.ERROR && row.getErrorCode() != null &&
PK_ERROR_CODES.contains(row.getErrorCode()) && row.getErrorMessage() != null &&
row.getErrorMessage().toLowerCase().contains("primary key");
}

private <T> T call(SFMCCall<T> callable) throws ETSdkException {
ClassLoader oldClassloader = Thread.currentThread().getContextClassLoader();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,5 +277,18 @@ public void testUpsert() throws Exception {
DataExtensionClient dataExtensionClient = Mockito.spy(new DataExtensionClient(client, dataExtensionKey));
Assert.assertNotNull(dataExtensionClient.upsert(row));
}

@Test
public void testIsPrimaryKeyError() {
ETResult<ETDataExtensionRow> mockRow = Mockito.mock(ETResult.class);
Mockito.when(mockRow.getStatus()).thenReturn(ETResult.Status.ERROR);
Mockito.when(mockRow.getErrorCode()).thenReturn(71005);
Mockito.when(mockRow.getErrorMessage()).thenReturn("This is Primary Key Violation on colum abc");
ETClient client = Mockito.mock(ETClient.class);
String dataExtensionKey = "DE";
DataExtensionClient dataExtensionClient = new DataExtensionClient(client, dataExtensionKey);
boolean result = dataExtensionClient.isPrimaryKeyError(mockRow);
Assert.assertTrue(result);
}
}

0 comments on commit e5dcb4c

Please sign in to comment.