From e8c0ea99b0122e8ceadb2ec2af06d57b27d7e847 Mon Sep 17 00:00:00 2001 From: Sagar Ahire Date: Tue, 13 Feb 2024 01:06:05 +0530 Subject: [PATCH] Initialize dq_failure before use in send-to-error directive --- pom.xml | 2 +- .../row/SendToErrorAndContinue.java | 2 ++ .../row/SendToErrorAndContinueTest.java | 25 +++++++++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index df806a290..b861520c9 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ io.cdap.wrangler wrangler - 4.8.2 + 4.8.3-SNAPSHOT Wrangler pom An interactive tool for data cleansing and transformation. diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java index 2acf52d5a..163511dc6 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java @@ -121,6 +121,8 @@ public List execute(List rows, ExecutorContext context) context.getTransientStore().increment(TransientVariableScope.LOCAL, "dq_failure", 1); } throw new ReportErrorAndProceed(message, 1); + } else if (context != null && !context.getTransientStore().getVariables().contains("dq_failure")) { + context.getTransientStore().set(TransientVariableScope.LOCAL, "dq_failure", 0L); } } catch (ELException e) { throw new DirectiveExecutionException(NAME, e.getMessage(), e); diff --git a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java index f981e14a5..718b2f488 100644 --- a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java +++ b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java @@ -107,4 +107,29 @@ public void testErrorConditionTrueAndContinueWithTransientVariable() throws Exce Assert.assertEquals(2, errors.size()); Assert.assertEquals(3, results.size()); } + + @Test + public void testErrorConditionFalseAndContinueWithTransientVariable() throws Exception { + String[] directives = new String[] { + "parse-as-csv body , true", + "drop body", + "send-to-error-and-continue exp:{body_3 == 'xyzw'} 'invalid value'", + "send-to-error-and-continue exp:{body_4=='1000'} 'junk' ", + "send-to-error exp:{dq_failure >= 1} " + }; + + List rows = Arrays.asList( + new Row("body", "1020134.298,,1,2,2 "), + new Row("body", "1020134.298,,xx,1,3"), + new Row("body", "1020134.298,,4,1,4"), + new Row("body", "1020134.298,,4,2,5"), + new Row("body", "1020134.298,,1,2,1") + ); + + RecipePipeline pipeline = TestingRig.execute(directives); + List results = pipeline.execute(rows); + List errors = pipeline.errors(); + Assert.assertEquals(0, errors.size()); + Assert.assertEquals(5, results.size()); + } }