-
Notifications
You must be signed in to change notification settings - Fork 1
/
tweets_retriever.py
110 lines (93 loc) · 3.79 KB
/
tweets_retriever.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from collections import Counter
import tweepy
import random
random.seed(3)
CONSUMER_KEY = ""
CONSUMER_SECRET = ""
OAUTH_TOKEN = ""
OAUTH_TOKEN_SECRET = ""
def batches(it, size):
batch = []
for item in it:
batch.append(item)
if len(batch) == size:
yield batch
batch = []
if len(batch) > 0: yield batch
class DatasetLang(object):
""" For each of the 13 language dataset, use annotated tweets
from the most-annotate annotator.
Skip all neutral tweets and only use 15000 tweets for each language.
"""
def __init__(self, lang, csvpath):
self.lang = lang
self.id2polar, self.which_annotator, self.max_annotations = self._parse_csv(csvpath)
print ("[INFO]: ! finish init the dataset object for {} !\n".format(lang))
def _parse_csv(self, csvpath):
annoor2annoes = {}
with open(csvpath, "r") as f:
for line in f.readlines()[1:]:
myid, polar, anno_id = line.strip().split(",")
if polar == "Neutral": continue
if anno_id in annoor2annoes:
annoor2annoes[anno_id].append((myid, polar))
else:
annoor2annoes[anno_id] = [(myid, polar)]
max_annotations = max(map(lambda x: len(x), annoor2annoes.values()))
for anno_id, annos in annoor2annoes.iteritems():
if len(annos) == max_annotations:
which_annotator = anno_id
break
print ("[INFO]: use results of annotator {}, with {} annotations.".format(
which_annotator, max_annotations
))
annos = annoor2annoes[which_annotator]
random.shuffle(annos)
annos = annos[:15000]
labels = [x[1] for x in annos]
print ("[INFO]: loading {} valid tweets.".format(len(annos)))
print ("[INFO]: tweets distribution {}".format(Counter(labels)))
return dict(annos), which_annotator, max_annotations
class TwitterRetriever(object):
def __init__(self):
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
self.twitter = tweepy.API(
auth,
wait_on_rate_limit=True,
wait_on_rate_limit_notify=True
)
def retrive_texts(self, tweet_ids):
tweet_status = self.twitter.statuses_lookup(id_=tweet_ids)
return tweet_status
class RetrievedData(object):
OUTPATH = "./retrieved_dataset/"
def __init__(self, para):
self.lang, csvpath = para[0], para[1]
self.num_written = 0
self.identifiers = DatasetLang(self.lang, csvpath)
self.retriever = TwitterRetriever()
self.retrieve_it()
def retrieve_it(self):
num_writes, met_id = 0, set()
all_ids = self.identifiers.id2polar.keys()
for batch_ids in batches(all_ids, 100):
mystatus = self.retriever.retrive_texts(batch_ids)
batch_id2txt = []
for sta in mystatus:
batch_id2txt.append((sta.id, sta.text))
self.write_batch(batch_id2txt)
def write_batch(self, batch_id2txt):
print (len(batch_id2txt))
with open(RetrievedData.OUTPATH + "{}.txt".format(self.lang), "a") as f:
for myid, mytext in batch_id2txt:
myid = str(myid)
if myid not in self.identifiers.id2polar:
continue
mypolar = self.identifiers.id2polar[myid]
f.write("{}\t{}\t\{}\n".format(myid, mypolar, mytext.encode("utf-8")))
self.num_written += 1
print ("[INFO]: finish written {} tweets for {}".format(self.num_written, self.lang))
if __name__ == "__main__":
locate = lambda x: "./dataset/{}_Twitter_sentiment.csv".format(x)
myretrievers = RetrievedData(("eng", locate("English")))