diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java index d2a20fbe9..20922d1e8 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java @@ -125,6 +125,8 @@ public List execute(List rows, ExecutorContext context) context.getTransientStore().increment(TransientVariableScope.LOCAL, "dq_failure", 1); } throw new ReportErrorAndProceed(message, 1); + } else if (context != null && !context.getTransientStore().getVariables().contains("dq_failure")) { + context.getTransientStore().set(TransientVariableScope.LOCAL, "dq_failure", 0L); } } catch (ELException e) { throw new DirectiveExecutionException(NAME, e.getMessage(), e); diff --git a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java index f981e14a5..718b2f488 100644 --- a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java +++ b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java @@ -107,4 +107,29 @@ public void testErrorConditionTrueAndContinueWithTransientVariable() throws Exce Assert.assertEquals(2, errors.size()); Assert.assertEquals(3, results.size()); } + + @Test + public void testErrorConditionFalseAndContinueWithTransientVariable() throws Exception { + String[] directives = new String[] { + "parse-as-csv body , true", + "drop body", + "send-to-error-and-continue exp:{body_3 == 'xyzw'} 'invalid value'", + "send-to-error-and-continue exp:{body_4=='1000'} 'junk' ", + "send-to-error exp:{dq_failure >= 1} " + }; + + List rows = Arrays.asList( + new Row("body", "1020134.298,,1,2,2 "), + new Row("body", "1020134.298,,xx,1,3"), + new Row("body", "1020134.298,,4,1,4"), + new Row("body", "1020134.298,,4,2,5"), + new Row("body", "1020134.298,,1,2,1") + ); + + RecipePipeline pipeline = TestingRig.execute(directives); + List results = pipeline.execute(rows); + List errors = pipeline.errors(); + Assert.assertEquals(0, errors.size()); + Assert.assertEquals(5, results.size()); + } }