Skip to content

Commit

Permalink
AIS-backport
Browse files Browse the repository at this point in the history
  • Loading branch information
Asmoday committed Dec 7, 2023
1 parent 9487390 commit 822eab4
Show file tree
Hide file tree
Showing 12 changed files with 799 additions and 190 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Apache NiFi was made for dataflow. It supports highly configurable directed grap
## Minimum Requirements
* JDK 8 Update 251
* Apache Maven 3.6.0
*Docker 20.10.6 or newer (used during nifi-elasticsearch-processors testing/packaging)

## Getting Started

Expand All @@ -78,7 +79,14 @@ For a more comprehensive guide to development and information about contributing
read through the [NiFi Developer's Guide](https://nifi.apache.org/developer-guide.html).

### Building

- Due to target byte code version set to JDK 11 on all modules, it is highly recommended deploying
nifi source code using the settled JDK (in other case, running tests can cause the "java:warning:
source release 11 requires target release 11" marked as error, which will not allow to run tests/
build modules). However, using JDK 11 can cause nifi-security-utils-api module build failure caused
by running tests which include asserting list of supported TLS protocols versions. To resolve this issue
it is necessary to edit JDK 11 settings (conf/security/java.security) and remove from
jdk.tls.disabledAlgorithms the following protocols: TLSv1, TLSv1.1.
-
Run the following Maven command to build standard project modules using parallel execution:

./mvnw clean install -T2C
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,22 @@ run() {

if [ -n "${run_as_user}" ]; then
preserve_environment=$(grep '^\s*preserve.environment' "${BOOTSTRAP_CONF}" | cut -d'=' -f2 | tr '[:upper:]' '[:lower:]')
SUDO="sudo"
if [ "$preserve_environment" = "true" ]; then
SUDO="sudo -E"
use_su=$(grep '^\s*use.su' "${BOOTSTRAP_CONF}" | cut -d'=' -f2 | tr '[:upper:]' '[:lower:]')

if [ "$use_su" = "true" ]; then
SUDO="/bin/su"
if [ "$preserve_environment" = "true" ]; then
echo "The use.su option is not supported with preserve.environment enabled for compatibility reasons. Exiting."
exit 1
fi
else
SUDO="sudo -u"
if [ "$preserve_environment" = "true" ]; then
SUDO="sudo -E -u"
fi
fi
# Provide SCRIPT_DIR and execute nifi-env for the run.as user command
RUN_MINIFI_CMD="${SUDO} -u ${run_as_user} sh -c \"SCRIPT_DIR='${SCRIPT_DIR}' && . '${SCRIPT_DIR}/minifi-env.sh' && ${RUN_MINIFI_CMD}\""
RUN_MINIFI_CMD="${SUDO} ${run_as_user} -s /bin/sh -c \"SCRIPT_DIR='${SCRIPT_DIR}' && . '${SCRIPT_DIR}/minifi-env.sh' && ${RUN_MINIFI_CMD}\""
fi

if [ "$1" = "run" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,23 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-standard-record-utils</artifactId>
<version>1.23.2</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ protected JsonNode parseJsonResponse(InputStream in) throws IOException {
return mapper.readTree(in);
}

protected void buildBulkCommand(StringBuilder sb, String index, String docType, String indexOp, String id, String jsonString) {
protected void buildBulkCommand(StringBuilder sb, String index, String docType, String indexOp, String id, String routing, String jsonString) {
if (indexOp.equalsIgnoreCase("index") || indexOp.equalsIgnoreCase("create")) {
sb.append("{\"");
sb.append(indexOp.toLowerCase());
Expand All @@ -301,38 +301,36 @@ protected void buildBulkCommand(StringBuilder sb, String index, String docType,
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\"");
}
if (!StringUtils.isEmpty(routing)){
sb.append(", \"routing\": \"");
sb.append(StringEscapeUtils.escapeJson(routing));
sb.append("\"");
}
sb.append("}}\n");
sb.append(jsonString);
sb.append("\n");
} else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
sb.append("{\"update\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\"");
if (StringUtils.isNotBlank(docType)) {
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
}
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\" } }\n");
addElasticSearchDocMetadata(sb, index, docType, id, routing);
sb.append("{\"doc\": ");
sb.append(jsonString);
sb.append(", \"doc_as_upsert\": ");
sb.append(indexOp.equalsIgnoreCase("upsert"));
sb.append(" }\n");
} else if (indexOp.equalsIgnoreCase("delete")) {
sb.append("{\"delete\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\"");
if (StringUtils.isNotBlank(docType)) {
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
}
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\" } }\n");
addElasticSearchDocMetadata(sb, index, docType, id, routing);
}
}

protected void addElasticSearchDocMetadata(StringBuilder sb, String index, String docType, String id, String routing) {
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\", \"routing\": \"");
sb.append(StringEscapeUtils.escapeJson(routing));
sb.append("\" }}\n");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

public static final PropertyDescriptor ROUTING_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("put-es-routing-attr")
.displayName("Routing Attribute")
.description("The name of the FlowFile attribute containing the routing for the document")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
.build();

private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;

Expand All @@ -154,6 +163,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {

final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ID_ATTRIBUTE);
descriptors.add(ROUTING_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(BATCH_SIZE);
Expand Down Expand Up @@ -215,6 +225,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
final String routing_attribute = context.getProperty(ROUTING_ATTRIBUTE).getValue();

// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
Expand Down Expand Up @@ -278,6 +289,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

final String id = (id_attribute != null) ? file.getAttribute(id_attribute) : null;
final String routing = (routing_attribute != null) ? file.getAttribute(routing_attribute) : null;

// The ID must be valid for all operations except "index". For that case,
// a missing ID indicates one is to be auto-generated by Elasticsearch
Expand Down Expand Up @@ -308,7 +320,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
continue;
}

buildBulkCommand(sb, index, docType, indexOp, id, jsonString);
buildBulkCommand(sb, index, docType, indexOp, id, routing, jsonString);
}
if (!flowFilesToTransfer.isEmpty()) {
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
.defaultValue("index")
.build();

static final PropertyDescriptor ROUTING_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-routing-path")
.displayName("Routing Record Path")
.description("A RecordPath pointing to a field in the record(s) that contains the routing identifier for the document")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress",
"Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");

Expand Down Expand Up @@ -287,6 +296,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
descriptors.add(RECORD_WRITER);
descriptors.add(LOG_ALL_ERRORS);
descriptors.add(ID_RECORD_PATH);
descriptors.add(ROUTING_RECORD_PATH);
descriptors.add(AT_TIMESTAMP_RECORD_PATH);
descriptors.add(AT_TIMESTAMP);
descriptors.add(INDEX);
Expand Down Expand Up @@ -429,7 +439,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
this.nullSuppression = context.getProperty(SUPPRESS_NULLS).getValue();

final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath recordPath = StringUtils.isEmpty(idPath) ? null : recordPathCache.getCompiled(idPath);
final RecordPath idRecordPath = StringUtils.isEmpty(idPath) ? null : recordPathCache.getCompiled(idPath);
final String routingPath = context.getProperty(ROUTING_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath routingRecordPath = StringUtils.isEmpty(routingPath) ? null : recordPathCache.getCompiled(routingPath);
final StringBuilder sb = new StringBuilder();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());

Expand All @@ -445,15 +457,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
while ((record = reader.nextRecord()) != null) {

final String id;
if (recordPath != null) {
Optional<FieldValue> idPathValue = recordPath.evaluate(record).getSelectedFields().findFirst();
if (!idPathValue.isPresent() || idPathValue.get().getValue() == null) {
throw new IdentifierNotFoundException("Identifier Record Path specified but no value was found, transferring {} to failure.");
}
id = idPathValue.get().getValue().toString();
} else {
id = null;
}
final String routing;

id = evaluateRecordPath(idRecordPath, record);

routing = evaluateRecordPath(routingRecordPath, record);

final Object timestamp;
if (atPath != null) {
Expand All @@ -478,7 +486,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
generator.close();
json.append(out.toString(charset.name()));

buildBulkCommand(sb, index, docType, indexOp, id, json.toString());
buildBulkCommand(sb, index, docType, indexOp, id, routing, json.toString());
recordCount++;
}
} catch (IdentifierNotFoundException infe) {
Expand Down Expand Up @@ -681,6 +689,18 @@ private void writeRecord(final Record record, final JsonGenerator generator, fin
generator.writeEndObject();
}

private String evaluateRecordPath(RecordPath routingRecordPath, Record record) throws IdentifierNotFoundException {
if (routingRecordPath != null) {
Optional<FieldValue> routingPathValue = routingRecordPath.evaluate(record).getSelectedFields().findFirst();
if (!routingPathValue.isPresent() || routingPathValue.get().getValue() == null) {
throw new IdentifierNotFoundException("Identifier Record Path specified but no value was found, transferring {} to failure.");
}
return routingPathValue.get().getValue().toString();
} else {
return null;
}
}

private Object coerceTimestampStringToLong(final String stringValue) {
return DataTypeUtils.isLongTypeCompatible(stringValue)
? DataTypeUtils.toLong(stringValue, "@timestamp")
Expand Down
Loading

0 comments on commit 822eab4

Please sign in to comment.