forked from maxbbraun/trump2cash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanalysis.py
261 lines (206 loc) · 9.19 KB
/
analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# -*- coding: utf-8 -*-
from google.cloud import language
from os import getenv
from re import compile
from re import IGNORECASE
from requests import get
from logs import Logs
# The URL for a GET request to the Wikidata API. The string parameter is the
# SPARQL query.
WIKIDATA_QUERY_URL = "https://query.wikidata.org/sparql?query=%s&format=JSON"
# A Wikidata SPARQL query to find stock ticker symbols and other information
# for a company. The string parameter is the Freebase ID of the company.
MID_TO_TICKER_QUERY = (
'SELECT ?companyLabel ?ownerLabel ?parentLabel ?tickerLabel'
' ?exchangeNameLabel WHERE {'
' ?instance wdt:P646 "%s" .' # Company with specified Freebase ID.
' ?instance wdt:P156* ?company .' # Company may have restructured.
' { ?company p:P414 ?exchange }' # Company traded on exchange.
' UNION { ?company wdt:P127+ ?owner .' # Or company is owned by another.
' ?owner p:P414 ?exchange }' # And owner traded on exchange.
' UNION { ?company wdt:P749+ ?parent .' # Or company is a subsidiary.
' ?parent p:P414 ?exchange } .' # And parent traded on exchange.
' VALUES ?exchanges { wd:Q13677 wd:Q82059 }' # Whitelist NYSE and NASDAQ.
' ?exchange ps:P414 ?exchanges .' # Stock exchange is whitelisted.
' ?exchange pq:P249 ?ticker .' # Get ticker symbol.
' ?exchange ps:P414 ?exchangeName .' # Get name of exchange.
' SERVICE wikibase:label {'
' bd:serviceParam wikibase:language "en" .' # Use English labels.
' }'
'} GROUP BY ?companyLabel ?ownerLabel ?parentLabel ?tickerLabel'
' ?exchangeNameLabel')
class Analysis:
"""A helper for analyzing company data in text."""
def __init__(self, logs_to_cloud):
self.logs = Logs(name="analysis", to_cloud=logs_to_cloud)
self.gcnl_client = language.Client()
def get_company_data(self, mid):
"""Looks up stock ticker information for a company via its Freebase ID.
"""
query = MID_TO_TICKER_QUERY % mid
bindings = self.make_wikidata_request(query)
if not bindings:
self.logs.debug("No company data found for MID: %s" % mid)
return None
# Collect the data from the response.
datas = []
for binding in bindings:
if "companyLabel" in binding:
name = binding["companyLabel"]["value"]
else:
name = None
if "ownerLabel" in binding:
owner = binding["ownerLabel"]["value"]
else:
owner = None
if "parentLabel" in binding:
parent = binding["parentLabel"]["value"]
else:
parent = None
if "tickerLabel" in binding:
ticker = binding["tickerLabel"]["value"]
else:
ticker = None
if "exchangeNameLabel" in binding:
exchange = binding["exchangeNameLabel"]["value"]
else:
exchange = None
data = {}
data["name"] = name
data["ticker"] = ticker
data["exchange"] = exchange
# Owner or parent get turned into root.
if owner:
data["root"] = owner
elif parent:
data["root"] = parent
# Add to the list unless we already have the same entry.
if data not in datas:
datas.append(data)
else:
self.logs.warn("Skipping duplicate company data: %s" % data)
return datas
def find_companies(self, tweet):
"""Finds mentions of companies in a tweet."""
# Use the text of the tweet with any mentions expanded to improve
# entity detection.
text = self.get_expanded_text(tweet)
if not text:
self.logs.error("Failed to get text from tweet: %s" % tweet)
return []
# Run entity detection.
document = self.gcnl_client.document_from_text(text)
entities = document.analyze_entities()
self.logs.debug("Found entities: %s" %
self.entities_tostring(entities))
# Collect all entities which are publicly traded companies, i.e.
# entities which have a known stock ticker symbol.
companies = []
for entity in entities:
# Use the Freebase ID of the entity to find company data. Skip any
# entity which doesn't have a Freebase ID (unless we find one via
# the Twitter handle).
name = entity.name
metadata = entity.metadata
if "mid" not in metadata:
self.logs.debug("No MID found for entity: %s" % name)
continue
mid = metadata["mid"]
company_data = self.get_company_data(mid)
# Skip any entity for which we can't find any company data.
if not company_data:
self.logs.debug("No company data found for entity: %s (%s)" %
(name, mid))
continue
self.logs.debug("Found company data: %s" % company_data)
for company in company_data:
# Extract and add a sentiment score.
sentiment = self.get_sentiment(text)
self.logs.debug("Using sentiment for company: %s %s" %
(sentiment, company))
company["sentiment"] = sentiment
# Add the company to the list unless we already have the same
# ticker.
tickers = [existing["ticker"] for existing in companies]
if not company["ticker"] in tickers:
companies.append(company)
else:
self.logs.warn(
"Skipping company with duplicate ticker: %s" % company)
return companies
def get_expanded_text(self, tweet):
"""Retrieves the text from a tweet with any @mentions expanded to
their full names.
"""
if (not tweet or "text" not in tweet or "entities" not in tweet or
"user_mentions" not in tweet["entities"]):
self.logs.error("Malformed tweet: %s" % tweet)
return None
text = tweet["text"]
mentions = tweet["entities"]["user_mentions"]
self.logs.debug("Using mentions: %s" % mentions)
for mention in mentions:
if "screen_name" not in mention or "name" not in mention:
self.logs.warn("Malformed mention: %s" % mention)
continue
screen_name = "@%s" % mention["screen_name"]
name = mention["name"]
self.logs.debug("Expanding mention: %s %s" % (screen_name, name))
pattern = compile(screen_name, IGNORECASE)
text = pattern.sub(name, text)
return text
def make_wikidata_request(self, query):
"""Makes a request to the Wikidata SPARQL API."""
query_url = WIKIDATA_QUERY_URL % query
self.logs.debug("Wikidata query: %s" % query_url)
response = get(query_url)
try:
response_json = response.json()
except ValueError:
self.logs.error("Failed to decode JSON response: %s" % response)
return None
self.logs.debug("Wikidata response: %s" % response_json)
if "results" not in response_json:
self.logs.error("No results in Wikidata response: %s" %
response_json)
return None
results = response_json["results"]
if "bindings" not in results:
self.logs.error("No bindings in Wikidata results: %s" % results)
return None
bindings = results["bindings"]
return bindings
def entities_tostring(self, entities):
"""Converts a list of entities to a readable string."""
tostrings = [self.entity_tostring(entity) for entity in entities]
return "[%s]" % ", ".join(tostrings)
def entity_tostring(self, entity):
"""Converts one entity to a readable string."""
if entity.wikipedia_url:
wikipedia_url = '"%s"' % entity.wikipedia_url
else:
wikipedia_url = None
metadata = ", ".join(['"%s": "%s"' % (key, value) for
key, value in entity.metadata.iteritems()])
mentions = ", ".join(['"%s"' % mention for mention in entity.mentions])
return ('{name: "%s",'
' entity_type: "%s",'
' wikipedia_url: %s,'
' metadata: {%s},'
' salience: %s,'
' mentions: [%s]}') % (
entity.name,
entity.entity_type,
wikipedia_url,
metadata,
entity.salience,
mentions)
def get_sentiment(self, text):
"""Extracts a sentiment score [-1, 1] from text."""
# TODO: Determine sentiment targeted at the specific entity.
document = self.gcnl_client.document_from_text(text)
sentiment = document.analyze_sentiment()
self.logs.debug(
"Sentiment score and magnitude for text: %s %s \"%s\"" %
(sentiment.score, sentiment.magnitude, text))
return sentiment.score