diff --git a/pom.xml b/pom.xml
index df806a290..b861520c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
io.cdap.wrangler
wrangler
- 4.8.2
+ 4.8.3-SNAPSHOT
Wrangler
pom
An interactive tool for data cleansing and transformation.
diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java
index 2acf52d5a..163511dc6 100644
--- a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java
+++ b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java
@@ -121,6 +121,8 @@ public List execute(List rows, ExecutorContext context)
context.getTransientStore().increment(TransientVariableScope.LOCAL, "dq_failure", 1);
}
throw new ReportErrorAndProceed(message, 1);
+ } else if (context != null && !context.getTransientStore().getVariables().contains("dq_failure")) {
+ context.getTransientStore().set(TransientVariableScope.LOCAL, "dq_failure", 0L);
}
} catch (ELException e) {
throw new DirectiveExecutionException(NAME, e.getMessage(), e);
diff --git a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java
index f981e14a5..718b2f488 100644
--- a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java
+++ b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java
@@ -107,4 +107,29 @@ public void testErrorConditionTrueAndContinueWithTransientVariable() throws Exce
Assert.assertEquals(2, errors.size());
Assert.assertEquals(3, results.size());
}
+
+ @Test
+ public void testErrorConditionFalseAndContinueWithTransientVariable() throws Exception {
+ String[] directives = new String[] {
+ "parse-as-csv body , true",
+ "drop body",
+ "send-to-error-and-continue exp:{body_3 == 'xyzw'} 'invalid value'",
+ "send-to-error-and-continue exp:{body_4=='1000'} 'junk' ",
+ "send-to-error exp:{dq_failure >= 1} "
+ };
+
+ List rows = Arrays.asList(
+ new Row("body", "1020134.298,,1,2,2 "),
+ new Row("body", "1020134.298,,xx,1,3"),
+ new Row("body", "1020134.298,,4,1,4"),
+ new Row("body", "1020134.298,,4,2,5"),
+ new Row("body", "1020134.298,,1,2,1")
+ );
+
+ RecipePipeline pipeline = TestingRig.execute(directives);
+ List results = pipeline.execute(rows);
+ List errors = pipeline.errors();
+ Assert.assertEquals(0, errors.size());
+ Assert.assertEquals(5, results.size());
+ }
}