-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathStreamConsumerCoronavirus.py
73 lines (61 loc) · 2.31 KB
/
StreamConsumerCoronavirus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
Author: Anshul Pardhi
Spark streaming using Kafka consumer which continuously collects filtered tweets,
performs sentiment analysis and sends the results to elastic search
"""
from kafka import KafkaConsumer
import json
from elasticsearch import Elasticsearch
from textblob import TextBlob
from nltk.sentiment.vader import SentimentIntensityAnalyzer
es = Elasticsearch()
def main():
"""
main function initiates a kafka consumer, initialize the tweetdata database.
Consumer consumes tweets from producer extracts features, cleanses the tweet text,
calculates sentiments and loads the data into postgres database
"""
# set-up a Kafka consumer
consumer = KafkaConsumer("twitter_coronavirus")
for msg in consumer:
dict_data = json.loads(msg.value)
# Create a blob
tweet_blob = TextBlob(dict_data["text"])
tweet = dict_data["text"]
print(tweet_blob) # Print the tweet text
# Calculate blob sentiment
print(format(tweet_blob.sentiment))
polarity = tweet_blob.sentiment.polarity
if polarity > 0:
blob_sentiment = "Positive"
elif polarity < 0:
blob_sentiment = "Negative"
else:
blob_sentiment = "Neutral"
# Calculate vader sentiment
sid = SentimentIntensityAnalyzer()
ss = sid.polarity_scores(tweet)
for k in sorted(ss):
print('{0}: {1}, '.format(k, ss[k]), end='')
if k == 'compound':
if ss[k] >= 0.05:
vader_sentiment = "Positive"
elif ss[k] <= -0.05:
vader_sentiment = "Negative"
else:
vader_sentiment = "Neutral"
print()
print("Blob Sentiment: ", blob_sentiment)
print("Vader Sentiment: ", vader_sentiment)
print()
# add text and sentiment info to elasticsearch
es.index(index="tweet_coronavirus",
doc_type="test-type",
body={"author": dict_data["user"]["screen_name"],
"date": dict_data["created_at"],
"message": dict_data["text"],
"blob_sentiment": blob_sentiment,
"vader_sentiment": vader_sentiment})
print('\n')
if __name__ == "__main__":
main()