This example is a walk through for parsing A10 audit logs
. The logs will be ingested from kafka topic a10_proxy
and parsed using syslog_log_audit_a10.proxy config.
Below section is written assuming you ingest logs from kafka and want to use our script to generate pipelines. For a manual process and/or for deeper understanding skip to the Detailed Setup section.
It is also assumed that you are using logstash on linux and logstash home is /usr/share/logstash path.(logstash home is the directory that contains pipelines.yml, logstash.yml, jvm.options etc.)
- Install LogStash >= 7.12
- Below plugins which do not come out of box. Install them by executing
logstash-plugin install \
logstash-input-okta_system_log \
logstash-filter-json_encode \
logstash-filter-tld
-
Setting up Enrichments
- Geoip enrichment
99_geoip.conf uses GeoLite databases for public and private GeoIP encrichments. If you plan to use this enrichment then you should have geoip files at below locations.
/mnt/s3fs_geoip/GeoLite2-City.mmdb /mnt/s3fs_geoip/GeoLitePrivate2-City.mmdb
Either remove the enrichment file if you don't want to use it or just touch above files if you are disabling the enrichment from settings.json. If you want to use this enrichment you need to add geoip files. For more information see using the Geoip filter.
In this example it is asummed that geoip enrichment would not be used.
- dns enrichment
09_dns.conf has variable VAR_DNS_SERVER for nameserver definition. Add server address like this in LOGSTASH_API_SECRET json.
export LOGSTASH_API_SECRET='{"dns_server" : "\"127.0.0.1\",\"127.0.0.2\""}'
Here 127.0.0.1 and 127.0.0.2 are nameservers. Make sure you add yours. Remove the enrichment file if you won't be using it.
- memcached/misp enrichment
09_dns.conf has variable VAR_DNS_SERVER for nameserver definition. Add server address like this in LOGSTASH_API_SECRET json.
export LOGSTASH_API_SECRET='{"memcached_address" : "\"127.0.0.1\",\"127.0.0.2\""}'
Here 127.0.0.1 and 127.0.0.2 are memcached endpoints. Make sure you add yours. Remove the enrichment file if you won't be using it.
-
Kafka
To fetch logs from kafka you should have a kafka cluster with access and credentials ready. Also you should have logs on a10_proxy
topic. A typical kafka input config would look like this. This is taken from kafka input template file.
input {
kafka {
bootstrap_servers => "VAR_KAFKA_BOOTSTRAP_SERVERS" # server address
client_id => "VAR_KAFKA_CLIENT_ID" # id for kafka client
group_id => "VAR_KAFKA_GROUP_ID" # consumer group id
consumer_threads => VAR_CONSUMER_THREADS # number of consumer threads to be assigned
ssl_truststore_location => "VAR_KAFKA_CLIENT_TRUSTSTORE" # truststore file path, trust your server signing certificate in this file
ssl_truststore_password => "VAR_KAFKA_TRUSTSTORE_PASSWORD" # ssl truststore password
jaas_path => "VAR_KAFKA_JAAS_PATH" # path to kafka jaas credentials
client_rack => "VAR_RACK_ID" # client rack id
topics => ["VAR_KAFKA_TOPIC"] # topic name
id => "VAR_LOGSTASH_PLUGIN_ID" # just an id for this plugin
max_poll_records => VAR_MAX_POLL_RECORDS # number of max records to be polled each time
codec => "VAR_CODEC"
partition_assignment_strategy => "cooperative_sticky"
security_protocol => "SASL_SSL" # Kafka security protocol, assuming you are using SASL_SSL else change this
sasl_mechanism => "SCRAM-SHA-512" # kafka sasl mechanism, assuming you are using this mechanism else change this
}
}
VAR* fields need to be passed from environment variable without VAR_ prefix. e.g. a key VAR_RACK_ID
should be passed as
export VAR_RACK_ID=kafka_rack1
Most of above have some default values. You can overwrite them in replace_vars method in generate_pipeline
- Execute
touch /mnt/s3fs_geoip/GeoLite2-City.mmdb
touch /mnt/s3fs_geoip/GeoLitePrivate2-City.mmdb
- Create a
settings.json
file in build_scripts directory with below content.a10_proxy
is the topic name in Kafka. Topic name should be key of the parsing definition andlog_source
value should also be same as topic name. elastic_index
{
"a10_proxy": {
"volume": "high",
"config": "syslog_log_audit_a10.proxy",
"elastic_index": "a10_proxy_audit_index",
"ignore_enrichments": ["disable_geoip_enrichment"],
"output_list": [
"elastic_output",
],
"kafka_input": {
"codec": "json"
}
}
}
- Create a
general.json
file in build_scripts directory with below content.
{
"num_indexers" : 1,
"prod_only_logs": [
],
"processing_config" : {
}
}
- Set the environment variables as explained in environment variable section of README.md. Replace all below values with your actual values.
export DEPLOY_ENV=test
export MY_INDEX='1'
export SUB_MY_IP=hostname_or_ip_without_dots_to_identify_instance
export ELASTIC_USER=your_elastic_user
export ELASTIC_PASSWORD=your_elastic_pass
export ELASTIC_CONNECTION_STRING='"127.0.0.1:9200", "127.0.0.2:9200"'
export KAFKA_CONNECTION_STRING=kafkahost:9000
export KAFKA_USER=your_kafka_uname
export KAFKA_PASSWORD=your_kafka_pwd
export RACK_ID=your_kafka_rack_id
export LOGSTASH_API_SECRET='{"memcached_address" : "\"127.0.0.1\",\"127.0.0.2\"", "dns_server" : "\"127.0.0.1\",\"127.0.0.2\""}'
- Run python build_scripts/generate_pipeline.py
- The script generates logs at /data dir. The script would fail if it cannot create that directory.
- Copy over the config directory to
/usr/share/logstash
- start logstash.
-
Install logstash with required plugins following above pre-requisite section of logstash.
-
Clone the repo
git clone https://github.com/Cargill/OpenSIEM-Logstash-Parsing
- Assuming your logstash config directory is /usr/share/logstash/config do
cp -r OpenSIEM-Logstash-Parsing/config/* /usr/share/logstash/config/
- Cleanup Remove kafka and azure input directories. We'll create a file input for this example.
rm -rf /usr/share/logstash/config/inputs/*
rm -rf /usr/share/logstash/config/outputs/*
dns enrichment needs a dns server.
geoip enrichment needs a geoip database file.
memcache/misp enrichment needs a memcache server.
For the sake of simplicity let's not use these. So do
rm -f /usr/share/logstash/config/enrichments/09_dns.conf
rm -f /usr/share/logstash/config/enrichments/99_geoip.conf
rm -f /usr/share/logstash/config/enrichments/100_misp.conf
If you want to use these enrichments then you need to do the needful so they can work. e.g. you need to replace the variable with actual values in dns and misp.
- Let's create our input config. Open /usr/share/logstash/config/inputs/a10_input.conf in editor and add below.
input {
file {
path => "/tmp/a10_audit.log"
}
}
filter {
mutate {
add_field => {
"[@metadata][output_file]" => "a10_%{+xxxx.MM.dd}"
"[@metadata][output_pipelines]" => [file_output]
}
}
}
output {
pipeline { send_to => [a10_processor]}
}
Make sure the path defined has a10 logs. This config creates an input source from the file path. Adds 2 metadata fields to specify the output file name and the output pipelines. Output pipelines is the name of pipelines where the logs would be sent after processing and enriching. This comes handy when we have multiple conditional outputs. After adding metadata fields this config forwards the event to a10_processor
pipeline.
-
Update processor file. Open /usr/share/logstash/config/processors/syslog_log_audit_a10.proxy.conf in editor. Replace
VAR_PIPELINE_NAME
witha10_processor
. All processors forward events to enrichments pipeline. -
Setup enrichment output. Enrichment input 00_input.conf is defined as
enrichments
pipeline, so the logs from processors are recieved there. We have to configure output part. Open 999_output.conf and replace the contents with below
output {
if "file_output" in [@metadata][output_pipelines] {
pipeline { send_to => "file_output" }
}
}
- Create output config Create /usr/share/logstash/config/outputs/file_out.conf and add below contents.
input {
pipeline { address => file_output }
}
output {
file {
path => "/tmp/%{[@metadata][output_file]}"
}
}
- Create pipeline.yml Replace /usr/share/logstash/config/pipelines.yml with below contents.
################# ENRICH #################
- pipeline.id: enrichments
path.config: "/usr/share/logstash/config/enrichments/{*}.conf"
################# OUTPUT #################
- pipeline.id: output
path.config: "/usr/share/logstash/config/outputs/file_out.conf"
############### INPUTS & PROCESSORS ###############
- pipeline.id: a10_input
path.config: "/usr/share/logstash/config/inputs/a10_input.conf"
- pipeline.id: a10_processor
path.config: "/usr/share/logstash/config/processors/syslog_log_audit_a10.proxy.conf"
- Start logstash After logstash runs, you should see parsed logs in a file called /tmp/a10_2021.10.27 assuming you ran it on Oct 27 2021. Note that this is because we defined the output file name to be generated with date pattern by logstash. ("a10_%{+xxxx.MM.dd}" defined in input conf file)