diff --git a/.gitignore b/.gitignore index 9f6bcf4..2477c06 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,6 @@ htmlcov/* # Local testing files for Docker build_full_environment.sh docker-compose.build.yml + +# Local environment things +.condaauto \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index a0e41f8..786f349 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,7 @@ deploy: repo: istresearch/traptor after_success: - - if [ "$TRAVIS_BRANCH" == "master" ] || [ $TRAVIS_BRANCH == "develop" ] || [ $TRAVIS_BRANCH == "pulse-3.1.3-prep" ]; then + - if [ "$TRAVIS_BRANCH" == "master" ] || [ $TRAVIS_BRANCH == "develop" ] || [ $TRAVIS_BRANCH == "pulse-3.1.3.1-prep" ]; then docker login -e="$DOCKER_EMAIL" -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD"; docker build --build-arg BUILD_NUMBER=$TRAVIS_BUILD_NUMBER -t istresearch/traptor:$TRAVIS_BRANCH .; docker push istresearch/traptor:$TRAVIS_BRANCH; diff --git a/requirements.txt b/requirements.txt index f5836da..b5da160 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,5 @@ pytest-cov==2.2.1 pytest-xdist==1.14 raven==5.13.0 datadog==0.14.0 -mockredispy==2.9.3 \ No newline at end of file +mockredispy==2.9.3 +tenacity==4.0.1 \ No newline at end of file diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index c55d0b5..73c4a72 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -14,7 +14,6 @@ from scripts.rule_extract import RulesToRedis from scutils.log_factory import LogObject from scutils.stats_collector import RollingTimeWindow -import scutils @pytest.fixture() @@ -392,7 +391,6 @@ def test_ensure_limit_message_counter_is_correctly_created(self, redis_rules, tr l_key = "limit:{}:{}".format(traptor.traptor_type, traptor.traptor_id) assert traptor.limit_counter.get_key() == l_key - # Main Loop def test_main_loop(self, redis_rules, traptor, tweets): diff --git a/traptor/traptor.py b/traptor/traptor.py index 791db89..867c3b5 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -9,13 +9,14 @@ import dateutil.parser as parser import traceback -from redis import StrictRedis, ConnectionError +import redis from kafka import KafkaProducer from kafka.common import KafkaUnavailableError from birdy.twitter import StreamClient, TwitterApiError import dd_monitoring import threading +from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type from scutils.log_factory import LogFactory from scutils.stats_collector import StatsCollector @@ -23,7 +24,8 @@ import logging -logging.basicConfig(level='INFO') +FORMAT = '%(asctime)-15s %(message)s' +logging.basicConfig(level='INFO', format=FORMAT) # Override the default JSONobject class MyBirdyClient(StreamClient): @@ -138,29 +140,34 @@ def _setup_birdy(self): self.apikeys['ACCESS_TOKEN_SECRET'] ) - def _setup_kafka(self): - """ Set up a Kafka connection. + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + retry=retry_if_exception_type(KafkaUnavailableError) + ) + def _create_kafka_producer(self): + """Create the Kafka producer""" + self.kafka_conn = KafkaProducer(bootstrap_servers=self.kafka_hosts, + value_serializer=lambda m: json.dumps(m), + api_version=(0, 9), + reconnect_backoff_ms=4000, + retries=3, + linger_ms=25, + buffer_memory=4 * 1024 * 1024) - Creates ``self.kafka_conn`` if it can reach the kafka brokers. - """ + def _setup_kafka(self): + """ Set up a Kafka connection.""" if self.kafka_enabled == 'true': self.logger.info('Setting up kafka connection') try: - self.kafka_conn = KafkaProducer(bootstrap_servers=self.kafka_hosts, - value_serializer=lambda m: json.dumps(m), - api_version=(0, 9), - reconnect_backoff_ms=4000, - retries=3, - linger_ms=25, - buffer_memory=4 * 1024 * 1024) - except KafkaUnavailableError as e: + self._create_kafka_producer() + except: self.logger.critical("Caught Kafka Unavailable Error", extra={ 'error_type': 'KafkaUnavailableError', 'ex': traceback.format_exc() }) dd_monitoring.increment('kafka_error', tags=['error_type:kafka_unavailable']) - sys.exit(3) + # sys.exit(3) else: self.logger.info('Skipping kafka connection setup') self.logger.debug('Kafka_enabled setting: {}'.format(self.kafka_enabled)) @@ -213,6 +220,36 @@ def _setup(self): # Create the locations_rule dict if this is a locations traptor self.locations_rule = {} + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + retry=retry_if_exception_type(TwitterApiError) + ) + def _create_twitter_follow_stream(self): + """Create a Twitter follow stream.""" + self.logger.info('Creating birdy follow stream') + self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(follow=self.twitter_rules, + stall_warnings='true') + + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + retry=retry_if_exception_type(TwitterApiError) + ) + def _create_twitter_track_stream(self): + """Create a Twitter follow stream.""" + self.logger.info('Creating birdy follow stream') + self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(track=self.twitter_rules, + stall_warnings='true') + + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + retry=retry_if_exception_type(TwitterApiError) + ) + def _create_twitter_locations_stream(self): + """Create a Twitter locations stream.""" + self.logger.info('Creating birdy locations stream') + self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(locations=self.twitter_rules, + stall_warnings='true') + def _create_birdy_stream(self): """ Create a birdy twitter stream. If there is a TwitterApiError it will exit with status code 3. @@ -225,11 +262,9 @@ def _create_birdy_stream(self): if self.traptor_type == 'follow': # Try to set up a twitter stream using twitter id list try: - self.logger.info('Creating birdy follow stream') - self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(follow=self.twitter_rules, - stall_warnings='true') + self._create_twitter_follow_stream() except TwitterApiError as e: - self.logger.critical("Caught Twitter Api Error", extra = { + self.logger.critical("Caught Twitter Api Error creating follow stream", extra = { 'error_type': 'TwitterAPIError', 'ex': traceback.format_exc() }) @@ -239,9 +274,7 @@ def _create_birdy_stream(self): elif self.traptor_type == 'track': # Try to set up a twitter stream using twitter term list try: - self.logger.info('Creating birdy track stream') - self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(track=self.twitter_rules, - stall_warnings='true') + self._create_twitter_track_stream() except TwitterApiError as e: self.logger.critical("Caught Twitter Api Error", extra={ 'error_type': 'TwitterAPIError', @@ -253,9 +286,7 @@ def _create_birdy_stream(self): elif self.traptor_type == 'locations': # Try to set up a twitter stream using twitter term list try: - self.logger.info('Creating birdy locations stream') - self.birdy_stream = self.birdy_conn.stream.statuses.filter.post(locations=self.twitter_rules, - stall_warnings='true') + self._create_twitter_locations_stream() except TwitterApiError as e: self.logger.critical("Caught Twitter Api Error", extra={ 'error_type': 'TwitterAPIError', @@ -317,6 +348,11 @@ def _make_rule_counters(self): self.rule_counters = rule_counters + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + retry=retry_if_exception_type(redis.ConnectionError) + ) def _increment_rule_counter(self, tweet): """ Increment a rule counter. @@ -330,8 +366,16 @@ def _increment_rule_counter(self, tweet): self.rule_counters[rule_id] = self._create_rule_counter(rule_id=rule_id) # If a rule value exists, increment the counter - if rule_id is not None and self.rule_counters[rule_id] is not None: - self.rule_counters[rule_id].increment() + try: + if rule_id is not None and self.rule_counters[rule_id] is not None: + self.rule_counters[rule_id].increment() + except: + self.logger.error("Caught exception while incrementing a rule counter", extra={ + 'error_type': 'RedisConnectionError', + 'ex': traceback.format_exc() + }) + dd_monitoring.increment('redis_error', + tags=['error_type:connection_error']) def _make_limit_message_counter(self): """ @@ -343,14 +387,27 @@ def _make_limit_message_counter(self): self.limit_counter = TraptorLimitCounter(key=limit_counter_key, window=collection_window) self.limit_counter.setup(redis_conn=self.redis_conn) + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + retry=retry_if_exception_type(redis.ConnectionError) + ) def _increment_limit_message_counter(self, limit_count): """ Increment the limit message counter :param limit_count: the integer value from the limit message """ - if self.limit_counter is not None: - self.limit_counter.increment(limit_count=limit_count) + try: + if self.limit_counter is not None: + self.limit_counter.increment(limit_count=limit_count) + except: + self.logger.error("Caught exception while incrementing a limit counter", extra={ + 'error_type': 'RedisConnectionError', + 'ex': traceback.format_exc() + }) + dd_monitoring.increment('redis_error', + tags=['error_type:connection_error']) def _get_locations_traptor_rule(self): """ @@ -645,6 +702,11 @@ def _find_rule_matches(self, tweet_dict): return new_dict + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + retry=retry_if_exception_type(redis.ConnectionError) + ) def _get_redis_rules(self): """ Yields a traptor rule from redis. This function expects that the redis keys are set up like follows: @@ -695,14 +757,13 @@ def _get_redis_rules(self): yield redis_rule self.logger.debug('Index: {0}, Redis_rule: {1}'.format( idx, redis_rule)) - except ConnectionError as e: + except: self.logger.critical("Caught exception while connecting to Redis", extra={ - 'error_type': 'ConnectionError', + 'error_type': 'RedisConnectionError', 'ex': traceback.format_exc() }) dd_monitoring.increment('redis_error', tags=['error_type:connection_error']) - sys.exit(3) # Special error code to track known failures @staticmethod def _tweet_time_to_iso(tweet_time): @@ -832,6 +893,11 @@ def _check_redis_pubsub_for_restart(self): self.logger.debug("Redis PubSub message found. Setting restart flag to True.") dd_monitoring.increment('restart_message_received') + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + retry=retry_if_exception_type(redis.ConnectionError) + ) def _add_heartbeat_message_to_redis(self, heartbeat_conn): """Add a heartbeat message to Redis.""" @@ -841,17 +907,9 @@ def _add_heartbeat_message_to_redis(self, self.traptor_id, now) message = "alive" - try: - dd_monitoring.increment('heartbeat_message_sent_success') - return heartbeat_conn.setex(key_to_add, time_to_live, message) - except ConnectionError as e: - self.logger.error("Caught exception while adding the heartbeat message to Redis", extra={ - 'error_type': 'ConnectionError', - 'ex': traceback.format_exc() - }) - dd_monitoring.increment('heartbeat_message_sent_failure', - tags=['error_type:redis_connection_error']) - raise + + dd_monitoring.increment('heartbeat_message_sent_success') + return heartbeat_conn.setex(key_to_add, time_to_live, message) def _send_heartbeat_message(self): """Add an expiring key to Redis as a heartbeat on a timed basis.""" @@ -860,9 +918,38 @@ def _send_heartbeat_message(self): # while Traptor is running, add a heartbeat message every 5 seconds while True: - self._add_heartbeat_message_to_redis(self.heartbeat_conn) + try: + self._add_heartbeat_message_to_redis(self.heartbeat_conn) + except Exception: + self.logger.error("Caught exception while adding the heartbeat message to Redis", extra={ + 'error_type': 'RedisConnectionError', + 'ex': traceback.format_exc() + }) + dd_monitoring.increment('heartbeat_message_sent_failure', + tags=['error_type:redis_connection_error']) + raise + time.sleep(hb_interval) + @retry(wait=wait_exponential(multiplier=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + retry=retry_if_exception_type(KafkaUnavailableError) + ) + def _send_enriched_data_to_kafka(self, tweet, enriched_data): + """" + Send the enriched data to Kafka + + :param tweet: the original tweet + :param enriched_data: the enriched data to send + """ + self.logger.info("Attempting to send tweet to kafka", extra={ + 'tweet_id': tweet.get('id_str', None) + }) + future = self.kafka_conn.send(self.kafka_topic, enriched_data) + future.add_callback(self.kafka_success_callback, tweet) + future.add_errback(self.kafka_failure_callback) + def _main_loop(self): """ Main loop for iterating through the twitter data. @@ -888,14 +975,10 @@ def _main_loop(self): enriched_data = self._enrich_tweet(tweet) if self.kafka_enabled == 'true': + try: - self.logger.info("Attempting to send tweet to kafka", extra={ - 'tweet_id': tweet.get('id_str', None) - }) - future = self.kafka_conn.send(self.kafka_topic, enriched_data) - future.add_callback(self.kafka_success_callback, tweet) - future.add_errback(self.kafka_failure_callback) - except Exception: + self._send_enriched_data_to_kafka(tweet, enriched_data) + except: self.logger.error("Caught exception adding Twitter message to Kafka", extra={ 'ex': traceback.format_exc() }) @@ -984,18 +1067,18 @@ def main(): redis_port = int(os.getenv('REDIS_PORT', 6379)) redis_db = int(os.getenv('REDIS_DB', 5)) - redis_conn = StrictRedis(host=redis_host, + redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True) # Redis pubsub connection - pubsub_conn = StrictRedis(host=redis_host, + pubsub_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db) # Redis heartbeat connection - heartbeat_conn = StrictRedis(host=redis_host, + heartbeat_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db) diff --git a/traptor/version.py b/traptor/version.py index 44c2664..e13964e 100644 --- a/traptor/version.py +++ b/traptor/version.py @@ -1,4 +1,4 @@ -__version__ = '1.4.5' +__version__ = '1.4.6' if __name__ == '__main__': print(__version__)