Skip to content

Commit

Permalink
Merge pull request #197 from IBMStreams/develop
Browse files Browse the repository at this point in the history
3.3.0
  • Loading branch information
markheger authored Oct 10, 2019
2 parents 668dfec + 403515b commit 53666a8
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ set the environment variable like this:

export STREAMS_ADAPTERS_ISS_PAM_DIRECTORY=/home/username/com.ibm.iss.pam

Alternative you can apply the 'Packet Analysis Module (PAM)' library to the application bundle and
specify the location with the parameters `pamLibrary` and `pamInclude` (since release v3.3.0).

# Threads

The PacketContentAssembler runs on the thread of the upstream operator that sends
Expand Down Expand Up @@ -247,15 +250,36 @@ memory.

This optional parameter is a pathname for the PAM library, a
Linux shared object library which implements the packet assembly engine.
If the pathname value specified is not absolute, then it is
relative to the directory specified by the STREAMS_ADAPTERS_ISS_PAM_DIRECTORY
environment variable at compile time.
The default value is 'iss-pam1.so'.
This parameter is required only if you have not set the
STREAMS_ADAPTERS_ISS_PAM_DIRECTORY environment variable.

Example value for PAM directory: `getThisToolkitDir()+"/etc/iss-pam1.so"`

</description>
<optional>true</optional>
<rewriteAllowed>true</rewriteAllowed>
<expressionMode>AttributeFree</expressionMode>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>

<parameter>
<name>pamInclude</name>
<description>

This optional parameter is a pathname for the PAM header file.
The pathname value specified is relative to the application directory at compile time.
The default value is 'pam.h'.
This parameter is required only if you have not set the
STREAMS_ADAPTERS_ISS_PAM_DIRECTORY environment variable at compile time.

Example value for PAM directory: `"etc/pam.h"`

</description>
<optional>true</optional>
<rewriteAllowed>false</rewriteAllowed>
<expressionMode>Constant</expressionMode>
<expressionMode>Expression</expressionMode>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ my $timestampAttribute = $model->getParameterByName("timestampAttribute")->getVa

my $fileChunkSize = $model->getParameterByName("fileChunkSize") ? $model->getParameterByName("fileChunkSize")->getValueAt(0)->getCppExpression() : 100*1024;
my $maximumFilesPerFlow = $model->getParameterByName("maximumFilesPerFlow") ? $model->getParameterByName("maximumFilesPerFlow")->getValueAt(0)->getCppExpression() : 0 ;
my $pamLibrary = $model->getParameterByName("pamLibrary") ? $model->getParameterByName("pamLibrary")->getValueAt(0)->getCppExpression() : "iss-pam1.so" ;
my $pamLibraryParameter = $model->getParameterByName("pamLibrary");

# get string arrays for optional parameters that have lists of values

Expand Down Expand Up @@ -69,7 +69,6 @@ for (my $i=0; $i<$model->getNumberOfOutputPorts(); $i++) {

print "// operator parameter 'packetAttribute': $packetAttribute\n";
print "// operator parameter 'timestampAttribute': $timestampAttribute\n";
print "// operator parameter 'pamLibrary': $pamLibrary\n";
print "// operator parameter 'fileChunkSize': $fileChunkSize\n";
foreach my $p (@pamTuning) { print "// operator parameter 'pamTuning': $p\n"; }
foreach my $f (sort keys %resultFunctions) { print "// assembler result function $f()\n"; }
Expand Down Expand Up @@ -1066,8 +1065,6 @@ void MY_OPERATOR::submitOutputTuple(const OutputEvent event, const uint8_t* data
}




// Constructor
MY_OPERATOR::MY_OPERATOR()
{
Expand All @@ -1076,7 +1073,12 @@ MY_OPERATOR::MY_OPERATOR()
// initialize operator parameters
fileChunkSize = <%=$fileChunkSize%>;
maximumFilesPerFlow = <%=$maximumFilesPerFlow%>;
pamLibrary = "<%=$pamLibrary%>";
<% if ($pamLibraryParameter) {%>
SPL::rstring pamLibraryStr = <%=$model->getParameterByName("pamLibrary")->getValueAt(0)->getCppExpression()%>;
pamLibrary = pamLibraryStr.c_str();
<% } else {%>
pamLibrary = "iss-pam1.so";
<% }%>
<% foreach my $parameter (@pamTuning) { print "pamTuning.push_back($parameter); " } %> ;

// initialize operator state variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,40 @@
#include <arpa/inet.h>
#include <netinet/ether.h>



<%
my $pamInclude = $model->getParameterByName("pamInclude") ? $model->getParameterByName("pamInclude")->getValueAt(0)->getSPLExpression() : "'pam.h'";
if ($pamInclude) {
# remove quotes
$pamInclude = substr($pamInclude, 1, length($pamInclude)-2);
SPL::CodeGen::printlnVerbose("pamInclude=".$pamInclude);

# we try to find the pam.h relativ to applicationDirectory
my $applicationDirectory = $model->getContext()->getApplicationDirectory();
SPL::CodeGen::printlnVerbose("applicationDirectory=".$applicationDirectory);

search_dir ($applicationDirectory);
sub search_dir {
my $path = shift;
my @dir_entries = glob("$path/*");
foreach my $entry (@dir_entries) {
SPL::CodeGen::printlnVerbose("$entry") if -f $entry;
SPL::CodeGen::printlnVerbose("$entry") and search_dir($entry) if -d $entry
}
}
my $pamHeader = $applicationDirectory."/".$pamInclude;
SPL::CodeGen::printlnVerbose("pamHeader=".$pamHeader);
if (-f $pamHeader) {%>
#include "<%=$pamHeader%>"
<%} else {
SPL::CodeGen::warnln("File not found:".$pamHeader);%>
#include "pam.h"
<%}
} else {%>
#include "pam.h"
<%}%>




Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,23 @@ The format of 'geohash' codes for latitude and longitude coordinates is describe
<type>boolean</type>
<cardinality>-1</cardinality>
</parameter>
<parameter>
<name>initOnTuple</name>
<description>

This optional parameter takes an expression of type 'boolean'
that specifies whether or not the operator should load the files specified with the `geographyDirectory` parameter on the first tuple.

The default value is 'false' and the operator loads the files on operator startup.
In some cases loading the files during operator startup might cause timeouts and setting this parameter value to 'true' resolves this.

</description>
<optional>true</optional>
<rewriteAllowed>true</rewriteAllowed>
<expressionMode>Expression</expressionMode>
<type>boolean</type>
<cardinality>-1</cardinality>
</parameter>
</parameters>
<inputPorts>
<inputPortSet>
Expand All @@ -184,6 +201,26 @@ The format of 'geohash' codes for latitude and longitude coordinates is describe
<cardinality>1</cardinality>
<optional>false</optional>
</inputPortSet>
<inputPortSet>
<description>Control port that ingests a file path pointing to a MaxMind GeoIP2 (or GeoLite2) database CSV files. The operator determines whether the incoming file path refers
to a &quot;locations&quot; file, &quot;IPv4 blocks&quot; file or &quot;IPv6 blocks&quot; file based on the name of the file. The expected file names are:

* For the &quot;locations&quot; file: `GeoIP2-City-Locations-en.csv` or `GeoLite2-City-Locations-en.csv`
* For IPv4 &quot;blocks&quot; file: `GeoIP2-City-Blocks-IPv4.csv` or `GeoLite2-City-Blocks-IPv4.csv`
* For IPv6 &quot;blocks&quot; file: `GeoIP2-City-Blocks-IPv6.csv` or `GeoLite2-City-Blocks-IPv6.csv`

This control port can be used to dynamically update the operator's internal database. Each time a tuple is received containing a path to one of
the files listed above, the operator will update it's internal table with the data in the file.

This input port expects a tuple containing a single attribute of type `rstring`.</description>
<windowingDescription></windowingDescription>
<tupleMutationAllowed>false</tupleMutationAllowed>
<windowingMode>NonWindowed</windowingMode>
<windowPunctuationInputMode>Oblivious</windowPunctuationInputMode>
<controlPort>true</controlPort>
<cardinality>1</cardinality>
<optional>true</optional>
</inputPortSet>
</inputPorts>
<outputPorts>
<outputPortOpenSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ my @outputPortList = @{ $model->getOutputPorts() };
# get C++ expressions for getting the values of this operator's parameters
my $geographyDirectory = $model->getParameterByName("geographyDirectory") ? $model->getParameterByName("geographyDirectory")->getValueAt(0)->getCppExpression() : -1;

my $initOnTuple = $model->getParameterByName("initOnTuple") ? $model->getParameterByName("initOnTuple")->getValueAt(0)->getCppExpression() : 0;

# special handling for 'outputFilters' parameter, which may include SPL functions that reference input tuples indirectly
my $outputFilterParameter = $model->getParameterByName("outputFilters");
my @outputFilterList;
Expand All @@ -40,6 +42,18 @@ SPL::CodeGen::exit(NetworkResources::NETWORK_NO_OUTPUT_PORTS()) unless scalar(@o
SPL::CodeGen::exit(NetworkResources::NETWORK_NOT_ENOUGH_OUTPUT_FILTERS()) if scalar(@outputFilterList) && scalar(@outputFilterList) < scalar(@outputPortList);
SPL::CodeGen::exit(NetworkResources::NETWORK_TOO_MANY_OUTPUT_FILTERS()) if scalar(@outputFilterList) && scalar(@outputFilterList) > scalar(@outputPortList);


# optional control port - inputPort1
my $inputPort1 = $model->getInputPortAt(1);
my $inputPort1CppType;
my $inputPort1CppName;
my $filenameAttribute;
if(defined $inputPort1) {
$inputPort1CppType = $inputPort1->getCppTupleType();
$inputPort1CppName = $inputPort1->getCppTupleName();
$filenameAttribute = $inputPort1->getAttributeAt(0)->getName();
}

%>


Expand Down Expand Up @@ -540,21 +554,31 @@ MY_OPERATOR::MY_OPERATOR()

// set operator parameters
geographyDirectory = <%=$geographyDirectory%>;

initOnTuple = <%=$initOnTuple%>;

// initialize operator state variables
tupleCounter = 0;

// clear the output tuples
<% for (my $i=0; $i<$model->getNumberOfOutputPorts(); $i++) { %> ;
outTuple<%=$i%>.clear();
<% } %> ;

if (initOnTuple == false) {
loadGeoData();
}
SPLAPPTRC(L_TRACE, "leaving <%=$myOperatorKind%> constructor ...", "IPAddressLocation");
}

void MY_OPERATOR::loadGeoData() {
SPLAPPTRC(L_INFO, "loadGeoData ...", "IPAddressLocation");
// clear the IPv4 address/subnet cache
for (int i=0; i<ipv4SubnetCacheSize; i++) {
ipv4SubnetCache[i].address = 0;
ipv4SubnetCache[i].subnet = NULL;
}

// clear the output tuples
<% for (my $i=0; $i<$model->getNumberOfOutputPorts(); $i++) { %> ;
outTuple<%=$i%>.clear();
<% } %> ;

// load the city location data
cityLocations.clear();

Expand Down Expand Up @@ -592,10 +616,10 @@ MY_OPERATOR::MY_OPERATOR()
} else {
THROW(SPLRuntimeOperator, "Sorry, " << geographyDirectory << " does not contain a 'City-Blocks-IPv6.csv' file");
}

SPLAPPTRC(L_TRACE, "leaving <%=$myOperatorKind%> constructor ...", "IPAddressLocation");
SPLAPPTRC(L_INFO, "loadGeoData completed", "IPAddressLocation");
}


// Destructor
MY_OPERATOR::~MY_OPERATOR()
{
Expand Down Expand Up @@ -638,22 +662,71 @@ void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)

SPL::AutoPortMutex automutex(processMutex, *this);

// increment tuple counter
tupleCounter++;
if(port == 0)
{
if ((initOnTuple) && (0 == tupleCounter)) {
loadGeoData();
}

// increment tuple counter
tupleCounter++;

// point at the input tuple
const IPort0Type& iport$0 = tuple;
// point at the input tuple
const IPort0Type& iport$0 = tuple;

// fill in and submit output tuples, as selected by output filters, if specified
<% for (my $i=0; $i<$model->getNumberOfOutputPorts(); $i++) { %> ;
<% if (scalar($outputFilterList[$i])) { print "if ($outputFilterList[$i])"; } %>
{
<% CodeGenX::copyOutputAttributesFromInputAttributes("outTuple$i", $model->getOutputPortAt($i), $model->getInputPortAt(0)); %> ;
<% CodeGenX::assignOutputAttributeValues("outTuple$i", $model->getOutputPortAt($i)); %> ;
SPLAPPTRC(L_TRACE, "submitting outTuple<%=$i%>=" << outTuple<%=$i%>, "IPAddressLocation");
submit(outTuple<%=$i%>, <%=$i%>);
// fill in and submit output tuples, as selected by output filters, if specified
<% for (my $i=0; $i<$model->getNumberOfOutputPorts(); $i++) { %> ;
<% if (scalar($outputFilterList[$i])) { print "if ($outputFilterList[$i])"; } %>
{
<% CodeGenX::copyOutputAttributesFromInputAttributes("outTuple$i", $model->getOutputPortAt($i), $model->getInputPortAt(0)); %> ;
<% CodeGenX::assignOutputAttributeValues("outTuple$i", $model->getOutputPortAt($i)); %> ;
SPLAPPTRC(L_TRACE, "submitting outTuple<%=$i%>=" << outTuple<%=$i%>, "IPAddressLocation");
submit(outTuple<%=$i%>, <%=$i%>);
}
<% } %>;

}
<% if(defined $inputPort1) {%>
else if(port == 1)
{
const <%=$inputPort1CppType%> & <%=$inputPort1CppName%> = static_cast <<%=$inputPort1CppType%>>(tuple);

// regex to determine if incoming file is BLOCKS or LOCATION
streams_boost::filesystem::path filePath(<%=$inputPort1CppName%>.get_<%=$filenameAttribute%>());
SPL::list<rstring> blocksMatch;

// Blocks IPv4
const rstring blocksIPv4Regex("^(GeoIP2|GeoLite2)-City-Blocks-IPv4.csv");
blocksMatch = SPL::Functions::String::regexMatchPerl(filePath.filename().string(), blocksIPv4Regex);
if(blocksMatch.size() > 1)
{
ipv4Subnets.clear();
loadIPv4Subnets(filePath.string());
return;
}

// Blocks IPv6
const rstring blocksIPv6Regex("^(GeoIP2|GeoLite2)-City-Blocks-IPv6.csv");
blocksMatch = SPL::Functions::String::regexMatchPerl(filePath.filename().string(), blocksIPv6Regex);
if(blocksMatch.size() > 1)
{
ipv6Subnets.clear();
loadIPv6Subnets(filePath.string());
return;
}

// Location Regex
const rstring locationRegex("^(GeoIP2|GeoLite2)-City-Locations-en.csv");
SPL::list<rstring> locationMatch = SPL::Functions::String::regexMatchPerl(filePath.filename().string(), locationRegex);

if(locationMatch.size() > 1)
{
cityLocations.clear();
loadCityLocations(filePath.string());
return;
}
}
<% } %> ;
<%}%>

SPLAPPTRC(L_TRACE, "leaving <%=$myOperatorKind%> process() ...", "IPAddressLocation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class MY_OPERATOR : public MY_BASE_OPERATOR
void process(Tuple const & tuple, uint32_t port);
void process(Punctuation const & punct, uint32_t port);

void loadGeoData();

// ----------- this structure contains geographical locations ----------

struct CityLocation {
Expand Down Expand Up @@ -120,6 +122,7 @@ class MY_OPERATOR : public MY_BASE_OPERATOR

std::string geographyDirectory;
uint32_t ipAddressAttributesCount;
bool initOnTuple;

// ----------- output tuples ----------

Expand Down
9 changes: 8 additions & 1 deletion com.ibm.streamsx.network/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ Operators and functions for ingesting and parsing raw network data at the packet

This is an overview of changes for major and minor version upgrades.

++ What is changed in version 3.3.0

* PacketContentAssembler operator does not depend anymore on environment variable STREAMS_ADAPTERS_ISS_PAM_DIRECTORY at build and run time.
The 'Packet Analysis Module (PAM)' library can be added to the application bundle and the location can be set with the parameters `pamLibrary` and `pamInclude`.
* IPAddressLocation operator: New parameter `initOnTuple` added in order to initialize the operator with loading the geography files on the first tuple and not during operator startup.
* IPAddressLocation operator supports dynamic loading of MaxMind database triggered by a tuple on control port.

++ What is changed in version 3.2.2

* Added the static keyword to a couple of IPv6 helper functions
Expand All @@ -25,7 +32,7 @@ This is an overview of changes for major and minor version upgrades.
* IPFilter operator: Changed traces with WARN trace level to TRACE level.

</info:description>
<info:version>3.2.2</info:version>
<info:version>3.3.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
Loading

0 comments on commit 53666a8

Please sign in to comment.