Skip to content


Folders and files

Last commit message
Last commit date

Latest commit


Repository files navigation

Learning Apache Storm

There are two Storm topologies for ELK stack.

  • LogAnalyzer: You can use it to ingest log stream from general Logstash json output through Kafka to Elasticsearch.
  • ApLogAnalyzer: It is similar to the above one. Use it to ingest log stream from customized Logstash json output through Kafka to Elasticsearch.

The overall data flow works as the following diagram:

log stream ==> Logstash ==> Kafka ==> Storm Topology ==> Elasticsearch

Before using this project, you must install the shaded jar for ElasticSearch 2.3.4 to your local Maven repository.

JDK 8 or above is required. Also, you have to install tools.jar first, which is normally locate at $JAVA_HOME/lib/tools.jar.

mvn install:install-file -Dpackaging=jar -Dversion=1.8 -Dfile=tools.jar -DgeneratePom=true


# 1. DELETE the old topic & indexes:
/usr/hdp/current/kafka-broker/bin/ --zookeeper hdpr01mgt:2181,hdpr01hn01:2181,hdpr01hn02:2181 --topic ap-log-v1 --delete

curl -XDELETE --user es_admin:password 'localhost:9200/aplog*?pretty'

# 1.1 Delete the topic znode on Zookeeper by zookeeper-client
# 1.2 Delete the topic commit logs
# 1.3 Create the Elasticsearch index template if needed

curl -XPUT -u es_admin:password "http://hdpr01wn01:9200/_template/aplog*?pretty=true" -d  '
  "template": "aplog*",
  "settings": {
    "number_of_replicas": 1,
    "number_of_shards": 5,
    "refresh_interval": "5s"
  "mappings": {
    "*": {
      "_source": {
        "enabled": true
      "properties": {
        "logTime": {
          "type":   "date",
          "format": "strict_date_time||yyyy-MM-dd HH:mm:ss.SSS||strict_date_optional_time||epoch_millis"
        "sysID" : {
          "type" : "string",
          "index": "not_analyzed"
        "apID" : {
          "type" : "string",
          "index": "not_analyzed" 
        "functID" : {
          "type" : "string",
          "index": "not_analyzed" 
        "logType" : {
          "type" : "string",
          "index": "not_analyzed" 
        "who" : {
          "type" : "string",
          "index": "not_analyzed"
        "reqFrom" : {
          "type" : "string",
          "index": "not_analyzed"
        "reqAt" : {
          "type" : "string",
          "index": "not_analyzed"
        "reqTo" : {
          "type" : "string",
          "index": "not_analyzed"
        "reqAction" : {
          "type" : "string",
          "index": "not_analyzed"
        "reqResult" : {
          "type" : "string",
          "index": "not_analyzed"
        "msgLevel" : {
          "type" : "string",
          "index": "not_analyzed"
        "msgCode" : {
          "type" : "string",
          "index": "not_analyzed"
        "reqTable" : {
          "type" : "string",
          "index": "not_analyzed"

# 2. RECREATE the topic:
/usr/hdp/current/kafka-broker/bin/ --zookeeper hdpr01mgt:2181,hdpr01hn01:2181,hdpr01hn02:2181 --topic ap-log-v1 --create --replication-factor 2 --partition 10 

# 3. START monitoring the topic:
/usr/hdp/current/kafka-broker/bin/ --zookeeper hdpr01mgt:2181 --topic ap-log-v1 --from-beginning

# 4. COMPILE & PACKAGE Storm topologies:
cd /root/workspace/LearnStorm/
mvn clean package -DskipTests

# 5. SUBMIT Storm topology: ApLogAnalyzer
storm jar target/LearnStorm-0.0.1-SNAPSHOT.jar com.pic.ala.ApLogAnalyzer

# 6. SUBMIT Storm topology: ApLogGenerator
storm jar target/LearnStorm-0.0.1-SNAPSHOT.jar com.pic.ala.ApLogGenerator

# 7. MONITOR the logs with Kibana

2. Commands for Kafka Maintainance

/usr/hdp/current/kafka-broker/bin/ --zookeeper hdpr01mgt:2181 --group aplog-analyzer
/usr/hdp/current/kafka-broker/bin/ --zookeeper hdpr01mgt:2181 --topic ap-log-v1 --describe

3. References:

4. Misc


mvn compile exec:java -Dstorm.topology=com.pic.ala.learn.TestTopology.TestTridentTopology
mvn compile exec:java -Dstorm.topology=com.pic.ala.learn.TestTopology.TridentWordCount
mvn compile exec:java -Dstorm.topology=com.pic.ala.learn.TestTopology.TridentKafkaWordCount

storm jar target/TestTopology-0.0.1-SNAPSHOT.jar com.pic.ala.learn.TestTopology.TridentKafkaWordCount hdp01.localdomain:2181 hdp02.localdomain:6667