Skip to content
This repository has been archived by the owner on Sep 12, 2022. It is now read-only.

Commit

Permalink
Merge branch 'pulse-3.1.3.1-prep'
Browse files Browse the repository at this point in the history
  • Loading branch information
rdempsey committed Mar 24, 2017
2 parents bd320b7 + 3fda61d commit 8710d4e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 59 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ htmlcov/*
# Local testing files for Docker
build_full_environment.sh
docker-compose.build.yml

# Local environment things
.condaauto
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ deploy:
repo: istresearch/traptor

after_success:
- if [ "$TRAVIS_BRANCH" == "master" ] || [ $TRAVIS_BRANCH == "develop" ] || [ $TRAVIS_BRANCH == "pulse-3.1.3-prep" ]; then
- if [ "$TRAVIS_BRANCH" == "master" ] || [ $TRAVIS_BRANCH == "develop" ] || [ $TRAVIS_BRANCH == "pulse-3.1.3.1-prep" ]; then
docker login -e="$DOCKER_EMAIL" -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD";
docker build --build-arg BUILD_NUMBER=$TRAVIS_BUILD_NUMBER -t istresearch/traptor:$TRAVIS_BRANCH .;
docker push istresearch/traptor:$TRAVIS_BRANCH;
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ pytest-cov==2.2.1
pytest-xdist==1.14
raven==5.13.0
datadog==0.14.0
mockredispy==2.9.3
mockredispy==2.9.3
tenacity==4.0.1
2 changes: 0 additions & 2 deletions tests/test_traptor_offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from scripts.rule_extract import RulesToRedis
from scutils.log_factory import LogObject
from scutils.stats_collector import RollingTimeWindow
import scutils


@pytest.fixture()
Expand Down Expand Up @@ -392,7 +391,6 @@ def test_ensure_limit_message_counter_is_correctly_created(self, redis_rules, tr
l_key = "limit:{}:{}".format(traptor.traptor_type, traptor.traptor_id)
assert traptor.limit_counter.get_key() == l_key


# Main Loop

def test_main_loop(self, redis_rules, traptor, tweets):
Expand Down
191 changes: 137 additions & 54 deletions traptor/traptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@
import dateutil.parser as parser
import traceback

from redis import StrictRedis, ConnectionError
import redis
from kafka import KafkaProducer
from kafka.common import KafkaUnavailableError
from birdy.twitter import StreamClient, TwitterApiError
import dd_monitoring

import threading
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

from scutils.log_factory import LogFactory
from scutils.stats_collector import StatsCollector
from traptor_limit_counter import TraptorLimitCounter

import logging

logging.basicConfig(level='INFO')
FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(level='INFO', format=FORMAT)

# Override the default JSONobject
class MyBirdyClient(StreamClient):
Expand Down Expand Up @@ -138,29 +140,34 @@ def _setup_birdy(self):
self.apikeys['ACCESS_TOKEN_SECRET']
)

def _setup_kafka(self):
""" Set up a Kafka connection.
@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
retry=retry_if_exception_type(KafkaUnavailableError)
)
def _create_kafka_producer(self):
"""Create the Kafka producer"""
self.kafka_conn = KafkaProducer(bootstrap_servers=self.kafka_hosts,
value_serializer=lambda m: json.dumps(m),
api_version=(0, 9),
reconnect_backoff_ms=4000,
retries=3,
linger_ms=25,
buffer_memory=4 * 1024 * 1024)

Creates ``self.kafka_conn`` if it can reach the kafka brokers.
"""
def _setup_kafka(self):
""" Set up a Kafka connection."""
if self.kafka_enabled == 'true':
self.logger.info('Setting up kafka connection')
try:
self.kafka_conn = KafkaProducer(bootstrap_servers=self.kafka_hosts,
value_serializer=lambda m: json.dumps(m),
api_version=(0, 9),
reconnect_backoff_ms=4000,
retries=3,
linger_ms=25,
buffer_memory=4 * 1024 * 1024)
except KafkaUnavailableError as e:
self._create_kafka_producer()
except:
self.logger.critical("Caught Kafka Unavailable Error", extra={
'error_type': 'KafkaUnavailableError',
'ex': traceback.format_exc()
})
dd_monitoring.increment('kafka_error',
tags=['error_type:kafka_unavailable'])
sys.exit(3)
# sys.exit(3)
else:
self.logger.info('Skipping kafka connection setup')
self.logger.debug('Kafka_enabled setting: {}'.format(self.kafka_enabled))
Expand Down Expand Up @@ -213,6 +220,36 @@ def _setup(self):
# Create the locations_rule dict if this is a locations traptor
self.locations_rule = {}

@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
retry=retry_if_exception_type(TwitterApiError)
)
def _create_twitter_follow_stream(self):
"""Create a Twitter follow stream."""
self.logger.info('Creating birdy follow stream')
self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(follow=self.twitter_rules,
stall_warnings='true')

@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
retry=retry_if_exception_type(TwitterApiError)
)
def _create_twitter_track_stream(self):
"""Create a Twitter follow stream."""
self.logger.info('Creating birdy follow stream')
self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(track=self.twitter_rules,
stall_warnings='true')

@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
retry=retry_if_exception_type(TwitterApiError)
)
def _create_twitter_locations_stream(self):
"""Create a Twitter locations stream."""
self.logger.info('Creating birdy locations stream')
self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(locations=self.twitter_rules,
stall_warnings='true')

def _create_birdy_stream(self):
""" Create a birdy twitter stream.
If there is a TwitterApiError it will exit with status code 3.
Expand All @@ -225,11 +262,9 @@ def _create_birdy_stream(self):
if self.traptor_type == 'follow':
# Try to set up a twitter stream using twitter id list
try:
self.logger.info('Creating birdy follow stream')
self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(follow=self.twitter_rules,
stall_warnings='true')
self._create_twitter_follow_stream()
except TwitterApiError as e:
self.logger.critical("Caught Twitter Api Error", extra = {
self.logger.critical("Caught Twitter Api Error creating follow stream", extra = {
'error_type': 'TwitterAPIError',
'ex': traceback.format_exc()
})
Expand All @@ -239,9 +274,7 @@ def _create_birdy_stream(self):
elif self.traptor_type == 'track':
# Try to set up a twitter stream using twitter term list
try:
self.logger.info('Creating birdy track stream')
self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(track=self.twitter_rules,
stall_warnings='true')
self._create_twitter_track_stream()
except TwitterApiError as e:
self.logger.critical("Caught Twitter Api Error", extra={
'error_type': 'TwitterAPIError',
Expand All @@ -253,9 +286,7 @@ def _create_birdy_stream(self):
elif self.traptor_type == 'locations':
# Try to set up a twitter stream using twitter term list
try:
self.logger.info('Creating birdy locations stream')
self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(locations=self.twitter_rules,
stall_warnings='true')
self._create_twitter_locations_stream()
except TwitterApiError as e:
self.logger.critical("Caught Twitter Api Error", extra={
'error_type': 'TwitterAPIError',
Expand Down Expand Up @@ -317,6 +348,11 @@ def _make_rule_counters(self):

self.rule_counters = rule_counters

@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
retry=retry_if_exception_type(redis.ConnectionError)
)
def _increment_rule_counter(self, tweet):
"""
Increment a rule counter.
Expand All @@ -330,8 +366,16 @@ def _increment_rule_counter(self, tweet):
self.rule_counters[rule_id] = self._create_rule_counter(rule_id=rule_id)

# If a rule value exists, increment the counter
if rule_id is not None and self.rule_counters[rule_id] is not None:
self.rule_counters[rule_id].increment()
try:
if rule_id is not None and self.rule_counters[rule_id] is not None:
self.rule_counters[rule_id].increment()
except:
self.logger.error("Caught exception while incrementing a rule counter", extra={
'error_type': 'RedisConnectionError',
'ex': traceback.format_exc()
})
dd_monitoring.increment('redis_error',
tags=['error_type:connection_error'])

def _make_limit_message_counter(self):
"""
Expand All @@ -343,14 +387,27 @@ def _make_limit_message_counter(self):
self.limit_counter = TraptorLimitCounter(key=limit_counter_key, window=collection_window)
self.limit_counter.setup(redis_conn=self.redis_conn)

@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
retry=retry_if_exception_type(redis.ConnectionError)
)
def _increment_limit_message_counter(self, limit_count):
"""
Increment the limit message counter
:param limit_count: the integer value from the limit message
"""
if self.limit_counter is not None:
self.limit_counter.increment(limit_count=limit_count)
try:
if self.limit_counter is not None:
self.limit_counter.increment(limit_count=limit_count)
except:
self.logger.error("Caught exception while incrementing a limit counter", extra={
'error_type': 'RedisConnectionError',
'ex': traceback.format_exc()
})
dd_monitoring.increment('redis_error',
tags=['error_type:connection_error'])

def _get_locations_traptor_rule(self):
"""
Expand Down Expand Up @@ -645,6 +702,11 @@ def _find_rule_matches(self, tweet_dict):

return new_dict

@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
retry=retry_if_exception_type(redis.ConnectionError)
)
def _get_redis_rules(self):
""" Yields a traptor rule from redis. This function
expects that the redis keys are set up like follows:
Expand Down Expand Up @@ -695,14 +757,13 @@ def _get_redis_rules(self):
yield redis_rule
self.logger.debug('Index: {0}, Redis_rule: {1}'.format(
idx, redis_rule))
except ConnectionError as e:
except:
self.logger.critical("Caught exception while connecting to Redis", extra={
'error_type': 'ConnectionError',
'error_type': 'RedisConnectionError',
'ex': traceback.format_exc()
})
dd_monitoring.increment('redis_error',
tags=['error_type:connection_error'])
sys.exit(3) # Special error code to track known failures

@staticmethod
def _tweet_time_to_iso(tweet_time):
Expand Down Expand Up @@ -832,6 +893,11 @@ def _check_redis_pubsub_for_restart(self):
self.logger.debug("Redis PubSub message found. Setting restart flag to True.")
dd_monitoring.increment('restart_message_received')

@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
retry=retry_if_exception_type(redis.ConnectionError)
)
def _add_heartbeat_message_to_redis(self,
heartbeat_conn):
"""Add a heartbeat message to Redis."""
Expand All @@ -841,17 +907,9 @@ def _add_heartbeat_message_to_redis(self,
self.traptor_id,
now)
message = "alive"
try:
dd_monitoring.increment('heartbeat_message_sent_success')
return heartbeat_conn.setex(key_to_add, time_to_live, message)
except ConnectionError as e:
self.logger.error("Caught exception while adding the heartbeat message to Redis", extra={
'error_type': 'ConnectionError',
'ex': traceback.format_exc()
})
dd_monitoring.increment('heartbeat_message_sent_failure',
tags=['error_type:redis_connection_error'])
raise

dd_monitoring.increment('heartbeat_message_sent_success')
return heartbeat_conn.setex(key_to_add, time_to_live, message)

def _send_heartbeat_message(self):
"""Add an expiring key to Redis as a heartbeat on a timed basis."""
Expand All @@ -860,9 +918,38 @@ def _send_heartbeat_message(self):

# while Traptor is running, add a heartbeat message every 5 seconds
while True:
self._add_heartbeat_message_to_redis(self.heartbeat_conn)
try:
self._add_heartbeat_message_to_redis(self.heartbeat_conn)
except Exception:
self.logger.error("Caught exception while adding the heartbeat message to Redis", extra={
'error_type': 'RedisConnectionError',
'ex': traceback.format_exc()
})
dd_monitoring.increment('heartbeat_message_sent_failure',
tags=['error_type:redis_connection_error'])
raise

time.sleep(hb_interval)

@retry(wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
retry=retry_if_exception_type(KafkaUnavailableError)
)
def _send_enriched_data_to_kafka(self, tweet, enriched_data):
""""
Send the enriched data to Kafka
:param tweet: the original tweet
:param enriched_data: the enriched data to send
"""
self.logger.info("Attempting to send tweet to kafka", extra={
'tweet_id': tweet.get('id_str', None)
})
future = self.kafka_conn.send(self.kafka_topic, enriched_data)
future.add_callback(self.kafka_success_callback, tweet)
future.add_errback(self.kafka_failure_callback)

def _main_loop(self):
"""
Main loop for iterating through the twitter data.
Expand All @@ -888,14 +975,10 @@ def _main_loop(self):
enriched_data = self._enrich_tweet(tweet)

if self.kafka_enabled == 'true':

try:
self.logger.info("Attempting to send tweet to kafka", extra={
'tweet_id': tweet.get('id_str', None)
})
future = self.kafka_conn.send(self.kafka_topic, enriched_data)
future.add_callback(self.kafka_success_callback, tweet)
future.add_errback(self.kafka_failure_callback)
except Exception:
self._send_enriched_data_to_kafka(tweet, enriched_data)
except:
self.logger.error("Caught exception adding Twitter message to Kafka", extra={
'ex': traceback.format_exc()
})
Expand Down Expand Up @@ -984,18 +1067,18 @@ def main():
redis_port = int(os.getenv('REDIS_PORT', 6379))
redis_db = int(os.getenv('REDIS_DB', 5))

redis_conn = StrictRedis(host=redis_host,
redis_conn = redis.StrictRedis(host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True)

# Redis pubsub connection
pubsub_conn = StrictRedis(host=redis_host,
pubsub_conn = redis.StrictRedis(host=redis_host,
port=redis_port,
db=redis_db)

# Redis heartbeat connection
heartbeat_conn = StrictRedis(host=redis_host,
heartbeat_conn = redis.StrictRedis(host=redis_host,
port=redis_port,
db=redis_db)

Expand Down
2 changes: 1 addition & 1 deletion traptor/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '1.4.5'
__version__ = '1.4.6'

if __name__ == '__main__':
print(__version__)

0 comments on commit 8710d4e

Please sign in to comment.