Skip to content

NIFI-14337 - Enhance JoltTransformJSON to Support JOLT Transformation… #9785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.jolt.util.TransformUtils;
Expand All @@ -41,6 +42,7 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;

import java.io.InputStream;
Expand All @@ -55,11 +57,31 @@
@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
@CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the JSON transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
@CapabilityDescription("Applies a list of JOLT specifications to either the FlowFile JSON content or a specified FlowFile JSON attribute. "
+ "When 'Json Source' is set to FLOW_FILE, the FlowFile content is transformed and the modified FlowFile is routed to the 'success' relationship. "
+ "When 'Json Source' is set to ATTRIBUTE, the specified attribute's value is transformed and updated in place, with the FlowFile routed to the 'success' relationship. "
+ "If the JSON transform fails, the original FlowFile is routed to the 'failure' relationship.")
@RequiresInstanceClassLoading
public class JoltTransformJSON extends AbstractJoltTransform {

public static final PropertyDescriptor JSON_SOURCE = new PropertyDescriptor.Builder()
.name("Json Source")
.description("Specifies whether the JOLT transformation is applied to FlowFile JSON content or to specified FlowFile JSON attribute.")
.required(true)
.allowableValues(SourceStrategy.class)
.defaultValue(SourceStrategy.FLOW_FILE)
.build();

public static final PropertyDescriptor JSON_SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Json Source Attribute")
.description("The FlowFile attribute containing JSON to be transformed. "
+ "This property is required only when 'Json Source' is set to ATTRIBUTE.")
.dependsOn(JSON_SOURCE, SourceStrategy.ATTRIBUTE)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
.build();

public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
.name("Pretty Print")
.displayName("Pretty Print")
Expand All @@ -80,16 +102,18 @@ public class JoltTransformJSON extends AbstractJoltTransform {

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The FlowFile with transformed content will be routed to this relationship")
.description("The FlowFile with successfully transformed content or updated attribute will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
.description("If the JSON transformation fails (e.g., due to invalid JSON in the content or attribute), the original FlowFile is routed to this relationship.")
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(
getCommonPropertyDescriptors().stream(),
Stream.of(
JSON_SOURCE,
JSON_SOURCE_ATTRIBUTE,
PRETTY_PRINT,
MAX_STRING_LENGTH
)
Expand Down Expand Up @@ -122,14 +146,34 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro

final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);

final Object inputJson;
try (final InputStream in = session.read(original)) {
inputJson = jsonUtil.jsonToObject(in);
} catch (final Exception e) {
logger.error("JSON parsing failed for {}", original, e);
session.transfer(original, REL_FAILURE);
return;
final boolean isSourceFlowFileContent = SourceStrategy.FLOW_FILE == context.getProperty(JSON_SOURCE).asAllowableValue(SourceStrategy.class);
String jsonSourceAttributeName = null;

if (isSourceFlowFileContent) {
try (final InputStream in = session.read(original)) {
inputJson = jsonUtil.jsonToObject(in);
} catch (final Exception e) {
logger.error("JSON parsing failed on FlowFile content for {}", original, e);
session.transfer(original, REL_FAILURE);
return;
}
} else {
jsonSourceAttributeName = context.getProperty(JSON_SOURCE_ATTRIBUTE).evaluateAttributeExpressions(original).getValue();
final String jsonSourceAttributeValue = original.getAttribute(jsonSourceAttributeName);
if (StringUtils.isBlank(jsonSourceAttributeValue)) {
logger.error("FlowFile attribute value was blank");
session.transfer(original, REL_FAILURE);
return;
} else {
try {
inputJson = jsonUtil.jsonToObject(jsonSourceAttributeValue);
} catch (final Exception e) {
logger.error("JSON parsing failed on FlowFile attribute for {}", original, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a good idea to include the name of the attribute like you did above.

Suggested change
logger.error("JSON parsing failed on FlowFile attribute for {}", original, e);
logger.error("JSON parsing failed on attribute '{}' of FlowFile {}", jsonSourceAttributeName, original, e);

session.transfer(original, REL_FAILURE);
return;
}
}
}

final String jsonString;
Expand All @@ -152,13 +196,18 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro
}
}

FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));

final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transform completed for {}", original);
if (isSourceFlowFileContent) {
FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transform completed on FlowFile content for {}", original);
} else {
session.putAttribute(original, jsonSourceAttributeName, jsonString);
session.transfer(original, REL_SUCCESS);
logger.info("Transform completed on FlowFile attribute for {}", original);
}
Copy link
Contributor

@dan-s1 dan-s1 Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, it may be beneficial to name the attribute which was transformed so it is clear in the logs

Suggested change
logger.info("Transform completed on FlowFile attribute for {}", original);
}
logger.info("Transform completed on attribute {} of FlowFile {}", sonSourceAttributeName, original);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dan-s1 Updated as per feedback.

}

@OnScheduled
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.processors.jolt;

import org.apache.nifi.components.DescribedValue;

public enum SourceStrategy implements DescribedValue {
FLOW_FILE("JOLT transformation applied to FlowFile content."),
ATTRIBUTE("JOLT transformation applied to FlowFile attribute.");

private final String description;

SourceStrategy(final String description) {
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return name();
}

@Override
public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class TestJoltTransformJSON {
final static Path JSON_INPUT = Paths.get("src/test/resources/TestJoltTransformJson/input.json");
final static Diffy DIFFY = new Diffy();
final static String CHAINR_SPEC_PATH = "src/test/resources/specs/chainrSpec.json";
final static String SHIFTR_SPEC_PATH = "src/test/resources/specs/shiftrSpec.json";
final static String SHIFTR_JSON_OUTPUT = "shiftrOutput.json";
final static String CHAINR_JSON_OUTPUT = "chainrOutput.json";
private static final String JSON_SOURCE_ATTR_NAME = "jsonSourceAttr";

static String chainrSpecContents;
private Processor processor;
private TestRunner runner;
Expand Down Expand Up @@ -199,36 +204,35 @@ void testCustomTransformationWithInvalidClassName() throws IOException {

@ParameterizedTest(name = "{index} {1}")
@MethodSource("getChainrArguments")
/*NOTE: Even though description is not used in the actual test, it needs to be declared in order to use it in the ParameterizedTest name argument*/
/*NOTE: Even though description is not used in the actual test, it needs to be declared in order to use it in the ParameterizedTest name argument*/
void testTransformInputWithChainr(Path specPath, String ignoredDescription) throws IOException {
final String spec = Files.readString(specPath);
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.enqueue(JSON_INPUT);
runner.run();

assertTransformedEquals("chainrOutput.json");
assertTransformedEquals(CHAINR_JSON_OUTPUT);
}

@Test
void testTransformInputWithShiftr() throws IOException {
final String spec = Files.readString(Paths.get("src/test/resources/specs/shiftrSpec.json"));
final String spec = Files.readString(Paths.get(SHIFTR_SPEC_PATH));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformStrategy.SHIFTR);
runner.enqueue(JSON_INPUT);
runner.run();

assertTransformedEquals("shiftrOutput.json");
assertTransformedEquals(SHIFTR_JSON_OUTPUT);
}

@Test
void testTransformInputWithShiftrFromFile() throws IOException {
final String spec = "./src/test/resources/specs/shiftrSpec.json";
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.JOLT_SPEC, SHIFTR_SPEC_PATH);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformStrategy.SHIFTR);
runner.enqueue(JSON_INPUT);
runner.run();

assertTransformedEquals("shiftrOutput.json");
assertTransformedEquals(SHIFTR_JSON_OUTPUT);
}

@Test
Expand All @@ -243,7 +247,7 @@ void testTransformInputWithShiftrFromFileExpression() throws IOException {
runner.enqueue(JSON_INPUT, attributes);
runner.run();

assertTransformedEquals("shiftrOutput.json");
assertTransformedEquals(SHIFTR_JSON_OUTPUT);
}

String addAccentedChars(String input) {
Expand All @@ -252,13 +256,13 @@ String addAccentedChars(String input) {

@Test
void testTransformInputWithShiftrAccentedChars() throws IOException {
final String spec = addAccentedChars(Files.readString(Paths.get("src/test/resources/specs/shiftrSpec.json")));
final String spec = addAccentedChars(Files.readString(Paths.get(SHIFTR_SPEC_PATH)));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformStrategy.SHIFTR);
runner.enqueue(addAccentedChars(Files.readString(JSON_INPUT)));
runner.run();

assertTransformedEquals("shiftrOutput.json");
assertTransformedEquals(SHIFTR_JSON_OUTPUT);
}

@Test
Expand Down Expand Up @@ -382,7 +386,7 @@ void testTransformInputWithCustomTransformationWithJar() throws IOException {
runner.enqueue(JSON_INPUT);
runner.run();

assertTransformedEquals("chainrOutput.json");
assertTransformedEquals(CHAINR_JSON_OUTPUT);
}

@Test
Expand All @@ -401,7 +405,7 @@ void testExpressionLanguageJarFile() throws IOException {
runner.enqueue(JSON_INPUT, customSpecs);
runner.run();

assertTransformedEquals("chainrOutput.json");
assertTransformedEquals(CHAINR_JSON_OUTPUT);
}

@Test
Expand All @@ -414,7 +418,7 @@ void testTransformInputWithCustomTransformationWithDir() throws IOException {
runner.enqueue(JSON_INPUT);
runner.run();

assertTransformedEquals("chainrOutput.json");
assertTransformedEquals(CHAINR_JSON_OUTPUT);
}

@Test
Expand All @@ -426,7 +430,7 @@ void testTransformInputWithChainrEmbeddedCustomTransformation() throws IOExcepti
runner.enqueue(JSON_INPUT);
runner.run();

assertTransformedEquals("chainrOutput.json");
assertTransformedEquals(CHAINR_JSON_OUTPUT);
}

@Test
Expand Down Expand Up @@ -464,6 +468,57 @@ void testJoltSpecInvalidEL() throws IOException {
runner.assertNotValid();
}

private static Stream<Arguments> provideJsonSourceAttributeArguments() {
String INVALID_INPUT_JSON = "{\"rating\":{\"primary\":{\"value\":3},\"series\":{\"value\":[5,4]},\"quality\":{\"value\":}}}";
String EXPECTED_JSON = "{\"rating\":{\"primary\":{\"value\":3},\"series\":{\"value\":[5,4]},\"quality\":{\"value\":3}}}";

return Stream.of(
Arguments.argumentSet("testJsonAttributeNotInitialised", JSON_SOURCE_ATTR_NAME, null,
SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, false, null),
Arguments.argumentSet("testInvalidJsonAttribute", JSON_SOURCE_ATTR_NAME, Map.of(JSON_SOURCE_ATTR_NAME, INVALID_INPUT_JSON),
SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, false, null),
Arguments.argumentSet("testValidJsonAttributeEL", "${dynamicJsonAttr}", Map.of("dynamicJsonAttr", JSON_SOURCE_ATTR_NAME, JSON_SOURCE_ATTR_NAME, EXPECTED_JSON),
SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, true, SHIFTR_JSON_OUTPUT),
Arguments.argumentSet("testValidJsonAttributeWithoutEL", JSON_SOURCE_ATTR_NAME, Map.of(JSON_SOURCE_ATTR_NAME, EXPECTED_JSON),
CHAINR_SPEC_PATH, JoltTransformStrategy.CHAINR, true, CHAINR_JSON_OUTPUT)
);
}

@ParameterizedTest
@MethodSource("provideJsonSourceAttributeArguments")
void testJsonSourceAttribute(String jsonSourceAttribute,
Map<String, String> flowFileAttributes,
String joltSpec,
JoltTransformStrategy joltStrategy,
boolean expectSuccess,
String expectedOutputFile) throws IOException {
runner.setProperty(JoltTransformJSON.JOLT_SPEC, joltSpec);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, joltStrategy);
runner.setProperty(JoltTransformJSON.JSON_SOURCE, SourceStrategy.ATTRIBUTE);
runner.setProperty(JoltTransformJSON.JSON_SOURCE_ATTRIBUTE, jsonSourceAttribute);
runner.enqueue(JSON_INPUT, flowFileAttributes != null ? flowFileAttributes : Collections.emptyMap());
runner.run();

if (expectSuccess) {
assertTransformedJsonAttributeEquals(expectedOutputFile);
} else {
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
}
}

private void assertTransformedJsonAttributeEquals(final String expectedOutputContent) throws IOException {
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);

final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).getFirst();
transformed.assertAttributeExists(JSON_SOURCE_ATTR_NAME);

final Object transformedJson = JsonUtils.jsonToObject(transformed.getAttribute(JSON_SOURCE_ATTR_NAME));

final String compareOutputPath = "src/test/resources/TestJoltTransformJson/%s".formatted(expectedOutputContent);
final Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get(compareOutputPath)));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}

private void assertTransformedEquals(final String expectedOutputFilename) throws IOException {
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);

Expand Down