diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java index afc72bca103e..dfdc3c58e91c 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java @@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.jolt.util.TransformUtils; @@ -41,6 +42,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import java.io.InputStream; @@ -55,11 +57,28 @@ @Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @WritesAttribute(attribute = "mime.type", description = "Always set to application/json") -@CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created " - + "with transformed content and is routed to the 'success' relationship. If the JSON transform " - + "fails, the original FlowFile is routed to the 'failure' relationship.") +@CapabilityDescription("Applies a list of Jolt specifications to either the FlowFile JSON content or a specified FlowFile JSON attribute. " + + "If the JSON transform fails, the original FlowFile is routed to the 'failure' relationship.") @RequiresInstanceClassLoading public class JoltTransformJSON extends AbstractJoltTransform { + + public static final PropertyDescriptor JSON_SOURCE = new PropertyDescriptor.Builder() + .name("JSON Source") + .description("Specifies whether the Jolt transformation is applied to FlowFile JSON content or to specified FlowFile JSON attribute.") + .required(true) + .allowableValues(SourceStrategy.class) + .defaultValue(SourceStrategy.FLOW_FILE) + .build(); + + public static final PropertyDescriptor JSON_SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("JSON Source Attribute") + .description("The FlowFile attribute containing JSON to be transformed.") + .dependsOn(JSON_SOURCE, SourceStrategy.ATTRIBUTE) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .build(); + public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder() .name("Pretty Print") .displayName("Pretty Print") @@ -80,16 +99,18 @@ public class JoltTransformJSON extends AbstractJoltTransform { public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") - .description("The FlowFile with transformed content will be routed to this relationship") + .description("The FlowFile with successfully transformed content or updated attribute will be routed to this relationship") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .description("If the JSON transformation fails (e.g., due to invalid JSON in the content or attribute), the original FlowFile is routed to this relationship.") .build(); private static final List PROPERTY_DESCRIPTORS = Stream.concat( getCommonPropertyDescriptors().stream(), Stream.of( + JSON_SOURCE, + JSON_SOURCE_ATTRIBUTE, PRETTY_PRINT, MAX_STRING_LENGTH ) @@ -122,14 +143,34 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro final ComponentLog logger = getLogger(); final StopWatch stopWatch = new StopWatch(true); - final Object inputJson; - try (final InputStream in = session.read(original)) { - inputJson = jsonUtil.jsonToObject(in); - } catch (final Exception e) { - logger.error("JSON parsing failed for {}", original, e); - session.transfer(original, REL_FAILURE); - return; + final boolean sourceStrategyFlowFile = SourceStrategy.FLOW_FILE == context.getProperty(JSON_SOURCE).asAllowableValue(SourceStrategy.class); + String jsonSourceAttributeName = null; + + if (sourceStrategyFlowFile) { + try (final InputStream in = session.read(original)) { + inputJson = jsonUtil.jsonToObject(in); + } catch (final Exception e) { + logger.error("JSON parsing failed on FlowFile content for {}", original, e); + session.transfer(original, REL_FAILURE); + return; + } + } else { + jsonSourceAttributeName = context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue(); + final String jsonSourceAttributeValue = original.getAttribute(jsonSourceAttributeName); + if (StringUtils.isBlank(jsonSourceAttributeValue)) { + logger.error("FlowFile attribute '{}' value is blank", jsonSourceAttributeName); + session.transfer(original, REL_FAILURE); + return; + } else { + try { + inputJson = jsonUtil.jsonToObject(jsonSourceAttributeValue); + } catch (final Exception e) { + logger.error("JSON parsing failed on attribute '{}' of FlowFile {}", jsonSourceAttributeName, original, e); + session.transfer(original, REL_FAILURE); + return; + } + } } final String jsonString; @@ -152,13 +193,18 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro } } - FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8))); - - final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); - transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.transfer(transformed, REL_SUCCESS); - session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.info("Transform completed for {}", original); + if (sourceStrategyFlowFile) { + FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8))); + final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); + transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json"); + session.transfer(transformed, REL_SUCCESS); + session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + logger.info("Transform completed on FlowFile content for {}", original); + } else { + session.putAttribute(original, jsonSourceAttributeName, jsonString); + session.transfer(original, REL_SUCCESS); + logger.info("Transform completed on attribute '{}' of FlowFile {}", jsonSourceAttributeName, original); + } } @OnScheduled diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/SourceStrategy.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/SourceStrategy.java new file mode 100644 index 000000000000..30fb973c5256 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/SourceStrategy.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.jolt; + +import org.apache.nifi.components.DescribedValue; + +public enum SourceStrategy implements DescribedValue { + FLOW_FILE("JOLT transformation applied to FlowFile content."), + ATTRIBUTE("JOLT transformation applied to FlowFile attribute."); + + private final String description; + + SourceStrategy(final String description) { + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return name(); + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java index 593e4dc518ff..ccafeba20083 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java @@ -51,6 +51,11 @@ class TestJoltTransformJSON { final static Path JSON_INPUT = Paths.get("src/test/resources/TestJoltTransformJson/input.json"); final static Diffy DIFFY = new Diffy(); final static String CHAINR_SPEC_PATH = "src/test/resources/specs/chainrSpec.json"; + final static String SHIFTR_SPEC_PATH = "src/test/resources/specs/shiftrSpec.json"; + final static String SHIFTR_JSON_OUTPUT = "shiftrOutput.json"; + final static String CHAINR_JSON_OUTPUT = "chainrOutput.json"; + private static final String JSON_SOURCE_ATTR_NAME = "jsonSourceAttr"; + static String chainrSpecContents; private Processor processor; private TestRunner runner; @@ -199,36 +204,35 @@ void testCustomTransformationWithInvalidClassName() throws IOException { @ParameterizedTest(name = "{index} {1}") @MethodSource("getChainrArguments") - /*NOTE: Even though description is not used in the actual test, it needs to be declared in order to use it in the ParameterizedTest name argument*/ + /*NOTE: Even though description is not used in the actual test, it needs to be declared in order to use it in the ParameterizedTest name argument*/ void testTransformInputWithChainr(Path specPath, String ignoredDescription) throws IOException { final String spec = Files.readString(specPath); runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); runner.enqueue(JSON_INPUT); runner.run(); - assertTransformedEquals("chainrOutput.json"); + assertTransformedEquals(CHAINR_JSON_OUTPUT); } @Test void testTransformInputWithShiftr() throws IOException { - final String spec = Files.readString(Paths.get("src/test/resources/specs/shiftrSpec.json")); + final String spec = Files.readString(Paths.get(SHIFTR_SPEC_PATH)); runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformStrategy.SHIFTR); runner.enqueue(JSON_INPUT); runner.run(); - assertTransformedEquals("shiftrOutput.json"); + assertTransformedEquals(SHIFTR_JSON_OUTPUT); } @Test void testTransformInputWithShiftrFromFile() throws IOException { - final String spec = "./src/test/resources/specs/shiftrSpec.json"; - runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); + runner.setProperty(JoltTransformJSON.JOLT_SPEC, SHIFTR_SPEC_PATH); runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformStrategy.SHIFTR); runner.enqueue(JSON_INPUT); runner.run(); - assertTransformedEquals("shiftrOutput.json"); + assertTransformedEquals(SHIFTR_JSON_OUTPUT); } @Test @@ -243,7 +247,7 @@ void testTransformInputWithShiftrFromFileExpression() throws IOException { runner.enqueue(JSON_INPUT, attributes); runner.run(); - assertTransformedEquals("shiftrOutput.json"); + assertTransformedEquals(SHIFTR_JSON_OUTPUT); } String addAccentedChars(String input) { @@ -252,13 +256,13 @@ String addAccentedChars(String input) { @Test void testTransformInputWithShiftrAccentedChars() throws IOException { - final String spec = addAccentedChars(Files.readString(Paths.get("src/test/resources/specs/shiftrSpec.json"))); + final String spec = addAccentedChars(Files.readString(Paths.get(SHIFTR_SPEC_PATH))); runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformStrategy.SHIFTR); runner.enqueue(addAccentedChars(Files.readString(JSON_INPUT))); runner.run(); - assertTransformedEquals("shiftrOutput.json"); + assertTransformedEquals(SHIFTR_JSON_OUTPUT); } @Test @@ -382,7 +386,7 @@ void testTransformInputWithCustomTransformationWithJar() throws IOException { runner.enqueue(JSON_INPUT); runner.run(); - assertTransformedEquals("chainrOutput.json"); + assertTransformedEquals(CHAINR_JSON_OUTPUT); } @Test @@ -401,7 +405,7 @@ void testExpressionLanguageJarFile() throws IOException { runner.enqueue(JSON_INPUT, customSpecs); runner.run(); - assertTransformedEquals("chainrOutput.json"); + assertTransformedEquals(CHAINR_JSON_OUTPUT); } @Test @@ -414,7 +418,7 @@ void testTransformInputWithCustomTransformationWithDir() throws IOException { runner.enqueue(JSON_INPUT); runner.run(); - assertTransformedEquals("chainrOutput.json"); + assertTransformedEquals(CHAINR_JSON_OUTPUT); } @Test @@ -426,7 +430,7 @@ void testTransformInputWithChainrEmbeddedCustomTransformation() throws IOExcepti runner.enqueue(JSON_INPUT); runner.run(); - assertTransformedEquals("chainrOutput.json"); + assertTransformedEquals(CHAINR_JSON_OUTPUT); } @Test @@ -464,6 +468,55 @@ void testJoltSpecInvalidEL() throws IOException { runner.assertNotValid(); } + private static Stream provideJsonSourceAttributeArguments() { + String INVALID_INPUT_JSON = "{\"rating\":{\"primary\":{\"value\":3},\"series\":{\"value\":[5,4]},\"quality\":{\"value\":}}}"; + String EXPECTED_JSON = "{\"rating\":{\"primary\":{\"value\":3},\"series\":{\"value\":[5,4]},\"quality\":{\"value\":3}}}"; + + return Stream.of( + Arguments.argumentSet("testJsonAttributeNotInitialised", JSON_SOURCE_ATTR_NAME, null, + SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, false, null), + Arguments.argumentSet("testInvalidJsonAttribute", JSON_SOURCE_ATTR_NAME, Map.of(JSON_SOURCE_ATTR_NAME, INVALID_INPUT_JSON), + SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, false, null), + Arguments.argumentSet("testValidJsonAttribute", JSON_SOURCE_ATTR_NAME, Map.of(JSON_SOURCE_ATTR_NAME, EXPECTED_JSON), + CHAINR_SPEC_PATH, JoltTransformStrategy.CHAINR, true, CHAINR_JSON_OUTPUT) + ); + } + + @ParameterizedTest + @MethodSource("provideJsonSourceAttributeArguments") + void testJsonSourceAttribute(String jsonSourceAttribute, + Map flowFileAttributes, + String joltSpec, + JoltTransformStrategy joltStrategy, + boolean expectSuccess, + String expectedOutputFile) throws IOException { + runner.setProperty(JoltTransformJSON.JOLT_SPEC, joltSpec); + runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, joltStrategy); + runner.setProperty(JoltTransformJSON.JSON_SOURCE, SourceStrategy.ATTRIBUTE); + runner.setProperty(JoltTransformJSON.JSON_SOURCE_ATTRIBUTE, jsonSourceAttribute); + runner.enqueue(JSON_INPUT, flowFileAttributes != null ? flowFileAttributes : Collections.emptyMap()); + runner.run(); + + if (expectSuccess) { + assertTransformedJsonAttributeEquals(expectedOutputFile); + } else { + runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE); + } + } + + private void assertTransformedJsonAttributeEquals(final String expectedOutputContent) throws IOException { + runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS); + + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).getFirst(); + transformed.assertAttributeExists(JSON_SOURCE_ATTR_NAME); + + final Object transformedJson = JsonUtils.jsonToObject(transformed.getAttribute(JSON_SOURCE_ATTR_NAME)); + + final String compareOutputPath = "src/test/resources/TestJoltTransformJson/%s".formatted(expectedOutputContent); + final Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get(compareOutputPath))); + assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); + } + private void assertTransformedEquals(final String expectedOutputFilename) throws IOException { runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);