Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extend NGSI-LD support for NGSIToPostgreSQL processor (and other global upgrades) #56

8 changes: 4 additions & 4 deletions nifi-ngsi-bundle/nifi-ngsi-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<packaging>jar</packaging>
<properties>
<mongo.driver.version>3.12.7</mongo.driver.version>
<nifi.version>1.13.0</nifi.version>
<nifi.version>1.14.0</nifi.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -213,7 +213,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-utils</artifactId>
<version>1.13.0</version>
<version>${nifi.version}</version>
</dependency>


Expand Down Expand Up @@ -262,8 +262,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup;
import org.apache.nifi.processors.ngsi.ngsi.backends.PostgreSQLBackend;
import org.apache.nifi.processors.ngsi.ngsi.utils.Entity;
import org.apache.nifi.processors.ngsi.ngsi.utils.NGSIConstants.POSTGRESQL_COLUMN_TYPES;
import org.apache.nifi.processors.ngsi.ngsi.utils.NGSIEvent;
import org.apache.nifi.processors.ngsi.ngsi.utils.NGSIUtils;
import org.apache.nifi.util.db.JdbcCommon;;
import org.apache.nifi.util.db.JdbcCommon;
import java.sql.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,8 +29,8 @@

@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Postgresql","sql", "put", "rdbms", "database", "create", "insert", "relational","NGSIv2", "NGSI","FIWARE"})
@CapabilityDescription("Create a Data Base if not exits using the information coming from and NGSI event converted to flow file." +
@Tags({"Postgresql","sql", "put", "rdbms", "database", "create", "insert", "relational","NGSIv2", "NGSI", "NGSI-LD", "FIWARE"})
@CapabilityDescription("Create a database if not exists using the information coming from an NGSI event converted to flow file." +
"After insert all of the vales of the flow file content extraction the entities and attributes")

@WritesAttributes({
Expand Down Expand Up @@ -97,6 +98,15 @@ public class NGSIToPostgreSQL extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

protected static final PropertyDescriptor DATASETID_PREFIX_TRUNCATE = new PropertyDescriptor.Builder()
.name("datasetid-prefix-truncate")
.displayName("Dataset id prefix to truncate")
.description("Prefix to truncate from dataset ids when generating column names for multi-attributes")
.required(false)
.defaultValue("")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

protected static final PropertyDescriptor ENABLE_ENCODING= new PropertyDescriptor.Builder()
.name("enable-encoding")
.displayName("Enable Encoding")
Expand Down Expand Up @@ -167,6 +177,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
properties.add(ATTR_PERSISTENCE);
properties.add(DEFAULT_SERVICE);
properties.add(DEFAULT_SERVICE_PATH);
properties.add(DATASETID_PREFIX_TRUNCATE);
properties.add(ENABLE_ENCODING);
properties.add(CKAN_COMPATIBILITY);
properties.add(ENABLE_LOWERCASE);
Expand Down Expand Up @@ -246,14 +257,49 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun
final String fiwareService = (event.getFiwareService().compareToIgnoreCase("nd")==0)?context.getProperty(DEFAULT_SERVICE).getValue():event.getFiwareService();
final String fiwareServicePath = ("ld".equals(context.getProperty(NGSI_VERSION).getValue()))?"":(event.getFiwareServicePath().compareToIgnoreCase("/nd")==0)?context.getProperty(DEFAULT_SERVICE_PATH).getValue():event.getFiwareServicePath();
try {
final String schemaName = postgres.buildSchemaName(fiwareService, context.getProperty(ENABLE_ENCODING).asBoolean(), context.getProperty(ENABLE_LOWERCASE).asBoolean(),context.getProperty(CKAN_COMPATIBILITY).asBoolean());
ArrayList<Entity> entities= new ArrayList<>();
entities = ("ld".equals(context.getProperty(NGSI_VERSION).getValue()))?event.getEntitiesLD():event.getEntities();
final String schemaName =
postgres.buildSchemaName(
fiwareService,
context.getProperty(ENABLE_ENCODING).asBoolean(),
context.getProperty(ENABLE_LOWERCASE).asBoolean(),
context.getProperty(CKAN_COMPATIBILITY).asBoolean()
);
ArrayList<Entity> entities =
"ld".equals(context.getProperty(NGSI_VERSION).getValue()) ? event.getEntitiesLD() : event.getEntities();

for (Entity entity : entities) {
ArrayList<String> listOfFields= postgres.listOfFields(context.getProperty(ATTR_PERSISTENCE).getValue(), entity,context.getProperty(NGSI_VERSION).getValue(),context.getProperty(CKAN_COMPATIBILITY).asBoolean());
String tableName = postgres.buildTableName(fiwareServicePath, entity, context.getProperty(DATA_MODEL).getValue(), context.getProperty(ENABLE_ENCODING).asBoolean(), context.getProperty(ENABLE_LOWERCASE).asBoolean(),context.getProperty(NGSI_VERSION).getValue(),context.getProperty(CKAN_COMPATIBILITY).asBoolean());
final String sql = postgres.insertQuery(entity, creationTime, fiwareServicePath, schemaName, tableName,listOfFields, context.getProperty(ATTR_PERSISTENCE).getValue(),context.getProperty(NGSI_VERSION).getValue(),context.getProperty(CKAN_COMPATIBILITY).asBoolean());
Map<String, POSTGRESQL_COLUMN_TYPES> listOfFields =
postgres.listOfFields(
context.getProperty(ATTR_PERSISTENCE).getValue(),
entity,
context.getProperty(NGSI_VERSION).getValue(),
context.getProperty(CKAN_COMPATIBILITY).asBoolean(),
context.getProperty(DATASETID_PREFIX_TRUNCATE).getValue()
);
String tableName =
postgres.buildTableName(
fiwareServicePath,
entity,
context.getProperty(DATA_MODEL).getValue(),
context.getProperty(ENABLE_ENCODING).asBoolean(),
context.getProperty(ENABLE_LOWERCASE).asBoolean(),
context.getProperty(NGSI_VERSION).getValue(),
context.getProperty(CKAN_COMPATIBILITY).asBoolean()
);
final String sql =
postgres.insertQuery(
entity,
creationTime,
fiwareServicePath,
schemaName,
tableName,
listOfFields,
context.getProperty(ATTR_PERSISTENCE).getValue(),
context.getProperty(NGSI_VERSION).getValue(),
context.getProperty(CKAN_COMPATIBILITY).asBoolean(),
context.getProperty(DATASETID_PREFIX_TRUNCATE).getValue()
);
getLogger().debug("Prepared insert query: {}", sql);
// Get or create the appropriate PreparedStatement to use.
final StatementFlowFileEnclosure enclosure = sqlToEnclosure
.computeIfAbsent(sql, k -> {
Expand All @@ -264,19 +310,18 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun

if (!exceptionHandler.execute(fc, flowFile, input -> {
final PreparedStatement stmt = enclosure.getCachedStatement(conn);
ArrayList<String> newColumns = new ArrayList<>();
JdbcCommon.setParameters(stmt, flowFile.getAttributes());
try {
System.out.println(postgres.checkColumnNames(tableName));
getLogger().info("Gonna create schema {}", schemaName);
conn.createStatement().execute(postgres.createSchema(schemaName));
conn.createStatement().execute(postgres.createTable(schemaName, tableName,listOfFields));
getLogger().info("Gonna create table {} with columns {}", tableName, listOfFields);
conn.createStatement().execute(postgres.createTable(schemaName, tableName, listOfFields));
ResultSet rs = conn.createStatement().executeQuery(postgres.checkColumnNames(tableName));
newColumns = postgres.getNewColumns(rs,listOfFields);
if (newColumns.size()>0){
conn.createStatement().execute(postgres.addColumns(schemaName,tableName,newColumns));
Map<String, POSTGRESQL_COLUMN_TYPES> newColumns = postgres.getNewColumns(rs, listOfFields);
if (newColumns.size() > 0) {
getLogger().info("Identified new columns to create: {}", newColumns);
conn.createStatement().execute(postgres.addColumns(schemaName, tableName, newColumns));
}
System.out.println(schemaName+"."+tableName+" columns -------- : ");

} catch (SQLException s) {
getLogger().error(s.toString());
}
Expand Down
Loading
Loading