From e48fea6d9135abbd8721173a32624285d1ec946d Mon Sep 17 00:00:00 2001 From: pklene96 Date: Wed, 17 Mar 2021 16:24:12 -0400 Subject: [PATCH 01/14] Filtering Twitter Data --- requirements.txt | 1 + tests/test_traptor_offline.py | 89 +++++++++++++++++++++++++++++- traptor/settings.py | 6 ++ traptor/traptor.py | 100 +++++++++++++++++++++++++++++----- 4 files changed, 180 insertions(+), 16 deletions(-) diff --git a/requirements.txt b/requirements.txt index c23d488..3ee46ba 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,3 +23,4 @@ uwsgi==2.0.15 pykafka==2.6.0 gevent==1.2.2 Werkzeug==0.16.1 +token-bucket==0.2.0 diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index 4b0f166..d443614 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -1,9 +1,11 @@ """Traptor unit tests.""" # To run with autotest and coverage and print all output to console run: # py.test -s --cov=traptor --looponfail tests/ - +from datetime import datetime, timedelta import os import json + +import token_bucket from redis import StrictRedis, ConnectionError import pytest from mock import MagicMock @@ -53,7 +55,7 @@ def pubsub_conn(): def traptor_notify_channel(): """Create a traptor notification channel.""" return getAppParamStr( - 'REDIS_PUBSUB_CHANNEL', 'traptor-notify' + 'REDIS_PUBSUB_CHANNEL', 'traptor-notify' ) @@ -604,7 +606,7 @@ def test_connection_retry(self, traptor): TwitterApiError("api error 2"), TwitterApiError("api error 3"), TwitterApiError("api error 4"), - None # Finally succeed + None # Finally succeed ]) try: @@ -625,3 +627,84 @@ def test_connection_retry(self, traptor): pass assert traptor._create_twitter_locations_stream.call_count == 1 + + def test_token_bucket(self): + storage = token_bucket.MemoryStorage() + limiter = token_bucket.Limiter(10, 10, storage) + for i in range(30): + if limiter.consume('key'): + print("Write to kafka") + else: + print("Filter") + import time + time.sleep(.05) + + def test_is_filtered_one_rule_value(self, traptor): + enriched_data = { + "traptor": { + "rule_value": "air force", + } + } + for i in range(100): + if not traptor._is_filtered(enriched_data): + print("Write to Kafka") + else: + print("Filter") + import time + time.sleep(.01) + print(len(traptor.twitter_rate["air force"])) + print(len(traptor.kafka_rate["air force"])) + + def test_is_filtered_dummy(self, traptor): + enriched_data = { + "traptor": { + "rule_value": "air force", + } + } + + twitter_rate = dict() + kafka_rate = dict() + rate_limiter = dict() + rule_last_seen = dict() + + key = enriched_data['traptor']['rule_value'] + rule_last_seen[key] = datetime.now() + + traptor.logger = MagicMock() + # check only if received tweet + if key in rule_last_seen: + value = rule_last_seen[key] + upper_bound = datetime.now() + lower_bound = upper_bound - timedelta(minutes=2) + + # thread interval every 2 minutes or function in traptor every 10,000 tweets + # function filter_maintance + for key, value in rule_last_seen: + + if upper_bound >= value >= lower_bound: + print('Last seen less than 2 minutes ago') + print(rule_last_seen) + else: + print('Last seen longer than 2 minutes ago, drop it') # debug + del rule_last_seen[key], twitter_rate[key], kafka_rate[key], rate_limiter[key] + print(rule_last_seen) + else: + rule_last_seen[key] = datetime.now() + + if key not in rate_limiter: + storage = token_bucket.MemoryStorage() + limiter = token_bucket.Limiter(10, 10, storage) + rate_limiter[key] = limiter + twitter_rate[key] = "Twitter Rate" # every tweet + kafka_rate[key] = "Kafka Rate" # wont know until after consume, only ones not filtered will be record here + rule_last_seen[key] = datetime.now() + + # How to get rates , track yourself + # rate_limiter[key].consume() # check boolean result, if false then filter. + + # create key, first timestamp, create new token bucket + # True if token bucket + for i in range(100): + traptor._is_filtered(enriched_data) + # number of trues match behavior expected + assert False diff --git a/traptor/settings.py b/traptor/settings.py index 455c518..3c1f4f7 100644 --- a/traptor/settings.py +++ b/traptor/settings.py @@ -46,6 +46,12 @@ 'ACCESS_TOKEN_SECRET': "" } +# Rate Limiter +RATE_LIMITING_ENABLED = bool(os.getenv('RATE_LIMITING_ENABLED', 'False') == 'True') +RATE_LIMITING_RATE_SEC = max(1.0, float(os.getenv('RATE_LIMITING_RATE_SEC', 10.0))) +RATE_LIMITING_CAPACITY = max(1, int(os.getenv('RATE_LIMITING_CAPACITY', 10))) +RATE_LIMITING_REPORTING_INTERVAL_SEC = max(1.0, float(os.getenv('RATE_LIMITING_REPORTING_INTERVAL_SEC'))) + # Manager API API_PORT = os.getenv('API_PORT', 5000) diff --git a/traptor/traptor.py b/traptor/traptor.py index f94d6a6..fe6987d 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -6,13 +6,16 @@ import os import time import random -from datetime import datetime +from collections import deque +from datetime import datetime, timedelta # noinspection PyPackageRequirements import dateutil.parser as parser import traceback import threading import redis +import token_bucket + import dd_monitoring import six @@ -266,6 +269,16 @@ def __init__( self.exit_event = threading.Event() self.exit = False + # three in memory dict that contain keys of rule values (up to 400 rule values, removing keys or memory leak) + # Map of rule value -> list of timestamps + self.twitter_rate = dict() + # Map of rule value -> list of kafka rate + self.kafka_rate = dict() + # Map of rule value -> token bucket + self.rate_limiter = dict() + + self.last_filter_maintenance = 0 + def sigterm_handler(_signo, _stack_frame): self._exit() @@ -1270,6 +1283,55 @@ def _send_enriched_data_to_kafka(self, tweet, enriched_data): future.add_callback(self.kafka_success_callback, tweet) future.add_errback(self.kafka_failure_callback) + def _filter_maintenance(self, expiration_age_sec=120): + expiration_time = time.time() - expiration_age_sec + for key, value in self.twitter_rate: + if value[-1] <= expiration_time: + del self.kafka_rate[key], self.rate_limiter[key], self.twitter_rate[key] + while value and value[0] <= expiration_time: + value.popleft() + for key, value in self.kafka_rate: + while value and value[0] <= expiration_time: + value.popleft() + + def _log_rates(self): + for key, value in self.twitter_rate: + # Edge cases + tps = len(value) / (value[-1] - value[0]) #edge cases + self.logger.info("Twitter Rate", extra=logExtra({ + 'rule_value': key, + 'tps': tps + })) + for key, value in self.kafka_rate: + # Edge cases + tps = len(value) / (value[-1] - value[0]) #edge cases + self.logger.info("Kafka Rate", extra=logExtra({ + 'rule_value': key, + 'tps': tps + })) + + def _is_filtered(self, enriched_data): + # set of keys -> rule_values + rule_values = set() + rule_values.add(enriched_data['traptor']['rule_value']) # TODO look for more rule values + filtered = True + for key in rule_values: + if key not in self.rate_limiter: + storage = token_bucket.MemoryStorage() + limiter = token_bucket.Limiter(settings.RATE_LIMITING_RATE_SEC, settings.RATE_LIMITING_CAPACITY, storage) + self.rate_limiter[key] = limiter + # tail - head / number of elements + if key not in self.twitter_rate: + self.twitter_rate[key] = deque() + self.twitter_rate[key].append(time.time()) + self.rule_last_seen[key] = time.time() + if self.rate_limiter[key].consume(key): + if key not in self.kafka_rate: + self.kafka_rate[key] = deque() + self.kafka_rate[key].append(time.time()) + filtered = False + return filtered + def _main_loop(self): """ Main loop for iterating through the twitter data. @@ -1296,22 +1358,34 @@ def _main_loop(self): 'tweet_id': tweet.get('id_str', None) })) enriched_data = self._enrich_tweet(tweet) - # #4204 - since 1.4.13 - theLogMsg = settings.DWC_SEND_TO_KAFKA_ENRICHED - self.logger.info(theLogMsg, extra=logExtra()) - if self.kafka_enabled: - try: - self._send_enriched_data_to_kafka(tweet, enriched_data) - except Exception as e: - theLogMsg = settings.DWC_ERROR_SEND_TO_KAFKA - self.logger.error(theLogMsg, extra=logExtra(e)) - dd_monitoring.increment('tweet_to_kafka_failure', - tags=['error_type:kafka']) + + if not self._is_filtered(enriched_data): + # #4204 - since 1.4.13 + theLogMsg = settings.DWC_SEND_TO_KAFKA_ENRICHED + self.logger.info(theLogMsg, extra=logExtra()) + if self.kafka_enabled: + try: + self._send_enriched_data_to_kafka(tweet, enriched_data) + except Exception as e: + theLogMsg = settings.DWC_ERROR_SEND_TO_KAFKA + self.logger.error(theLogMsg, extra=logExtra(e)) + dd_monitoring.increment('tweet_to_kafka_failure', + tags=['error_type:kafka']) + else: + self.logger.debug(json.dumps(enriched_data, indent=2)) else: - self.logger.debug(json.dumps(enriched_data, indent=2)) + self.logger.debug("Tweet Rate Filtered", extra=logExtra({ + 'value_str': json.dumps(enriched_data, indent=2) + })) + else: self.logger.info("Stream keep-alive received", extra=logExtra()) + if time.time() > self.last_filter_maintenance + settings.RATE_LIMITING_REPORTING_INTERVAL_SEC: + self.last_filter_maintenance = time.time() + self._filter_maintenance(settings.RATE_LIMITING_REPORTING_INTERVAL_SEC) + self.log_tweet_rates() + if self.exit: break From fd7862954f3cf458ec680d2258fcee27610e30ef Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Thu, 18 Mar 2021 16:52:11 -0400 Subject: [PATCH 02/14] Updated rule matching logic --- tests/test_traptor_offline.py | 6 +- traptor/traptor.py | 433 +++++++++++++++------------------- 2 files changed, 193 insertions(+), 246 deletions(-) diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index d443614..66142d7 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -642,7 +642,7 @@ def test_token_bucket(self): def test_is_filtered_one_rule_value(self, traptor): enriched_data = { "traptor": { - "rule_value": "air force", + "collection_rules": [{"value": "air force"}] } } for i in range(100): @@ -658,7 +658,7 @@ def test_is_filtered_one_rule_value(self, traptor): def test_is_filtered_dummy(self, traptor): enriched_data = { "traptor": { - "rule_value": "air force", + "collection_rules": [{"value": "air force"}] } } @@ -667,7 +667,7 @@ def test_is_filtered_dummy(self, traptor): rate_limiter = dict() rule_last_seen = dict() - key = enriched_data['traptor']['rule_value'] + key = enriched_data['traptor']['collection_rules'][0]['value'] rule_last_seen[key] = datetime.now() traptor.logger = MagicMock() diff --git a/traptor/traptor.py b/traptor/traptor.py index fe6987d..6745a58 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import copy import json import signal import sys @@ -701,295 +702,236 @@ def _get_locations_traptor_rule(self): return locations_rule - def _find_rule_matches(self, tweet_dict): + def _get_url_fields(self, url_entities, collection): + """ + + :param url_entities: + :type url_entities: list + :param collection: + :type collection: set """ - Find a rule match for the tweet. - This code only expects there to be one match. If there is more than - one, it will use the last one it finds since the first match will be - overwritten. + for item in url_entities: + value = item.get('expanded_url') + + if value is not None: + collection.add(value) - :param dict tweet_dict: The dictionary twitter object. - :returns: a ``dict`` with the augmented data fields. + value = item.get('display_url') + + if value is not None: + collection.add(value) + + def _build_match_content(self, tweet): """ - new_dict = tweet_dict - self.logger.debug('Finding tweet rule matches') + Assembles the content of the tweet into a searchable data structure. All content in put into lowercase. - # If the Traptor is a geo traptor, return the one rule we've already set up - if self.traptor_type == 'locations': - for key, value in self.locations_rule.iteritems(): - new_dict['traptor'][key] = value + :param tweet: The dictionary twitter object. + :type tweet: dict + :rtype: dict + """ + searchable = { + "hashtag": set(), + "keyword": u"", + "username": set(), + "userid": set() + } - # Do track Traptor enrichments... - elif self.traptor_type == 'track': + # Placeholder defaults to minimize allocations + _d = dict() + _l = list() + _s = '' + + if self.traptor_type == 'track': + + free_text = {tweet.get('value', _s), + tweet.get('quoted_status', _d).get('extended_tweet', _d).get('full_text', _s), + tweet.get('quoted_status', _d).get('text', _s), + tweet.get('retweeted_status', _d).get('extended_tweet', _d).get('full_text', _s), + tweet.get('retweeted_status', _d).get('text', _s), + tweet.get('user', _d).get('screen_name', _s)} + + self._get_url_fields(tweet.get('entities', _d).get('urls', _l), free_text) + self._get_url_fields(tweet.get('extended_tweet', _d).get('entities', _d).get('urls', _l), free_text) + self._get_url_fields(tweet.get('retweeted_status', _d).get('extended_tweet', _d).get('entities', _d).get('urls', _l), free_text) + self._get_url_fields(tweet.get('retweeted_status', _d).get('entities', _d).get('urls', _l), free_text) + self._get_url_fields(tweet.get('quoted_status', _d).get('extended_tweet', _d).get('entities', _d).get('urls', _l), free_text) + self._get_url_fields(tweet.get('quoted_status', _d).get('entities', _d).get('urls', _l), free_text) + self._get_url_fields(tweet.get('extended_tweet', _d).get('entities', _d).get('media', _l), free_text) + self._get_url_fields(tweet.get('entities', _d).get('media', _l), free_text) + self._get_url_fields(tweet.get('retweeted_status', _d).get('extended_tweet', _d).get('entities', _d).get('media', _l), free_text) + self._get_url_fields(tweet.get('retweeted_status', _d).get('entities', _d).get('media', _l), free_text) + self._get_url_fields(tweet.get('quoted_status', _d).get('extended_tweet', _d).get('entities', _d).get('media', _l), free_text) + self._get_url_fields(tweet.get('quoted_status', _d).get('entities', _d).get('media', _l), free_text) + + searchable['keyword'] = u" ".join(free_text).lower() + + for hashtag in tweet.get('extended_tweet', _d).get('entities', _d).get('hashtags', _l): + if 'text' in hashtag and hashtag['text'] is not None: + searchable['hashtag'].add(hashtag.get('text').lower()) + + for hashtag in tweet.get('entities', _d).get('hashtags', _l): + if 'text' in hashtag and hashtag['text'] is not None: + searchable['hashtag'].add(hashtag.get('text').lower()) + + for hashtag in tweet.get('retweeted_status', _d).get('extended_tweet', _d).get('entities', _d).get('hashtags', _l): + if 'text' in hashtag and hashtag['text'] is not None: + searchable['hashtag'].add(hashtag.get('text').lower()) + + for hashtag in tweet.get('retweeted_status', _d).get('entities', _d).get('hashtags', _l): + if 'text' in hashtag and hashtag['text'] is not None: + searchable['hashtag'].add(hashtag.get('text').lower()) + + for hashtag in tweet.get('quoted_status', _d).get('extended_tweet', _d).get('entities', _d).get('hashtags', _l): + if 'text' in hashtag and hashtag['text'] is not None: + searchable['hashtag'].add(hashtag.get('text').lower()) + + for hashtag in tweet.get('quoted_status', _d).get('entities', _d).get('hashtags', _l): + if 'text' in hashtag and hashtag['text'] is not None: + searchable['hashtag'].add(hashtag.get('text').lower()) + + searchable['hashtag'].remove(_s) - """ - Here's how Twitter does it, and so shall we: - - The text of the Tweet and some entity fields are considered for matches. - Specifically, the text attribute of the Tweet, expanded_url and display_url - for links and media, text for hashtags, and screen_name for user mentions - are checked for matches. - """ - - # Build up the query from our tweet fields - query = "" - - # Tweet text - query = query + tweet_dict['text'].encode("utf-8") - - # URLs and Media - url_list = [] - if 'urls' in tweet_dict['entities']: - for url in tweet_dict['entities']['urls']: - expanded_url = url.get('expanded_url', None) - display_url = url.get('display_url', None) - - if expanded_url is not None: - url_list.append(expanded_url) - if display_url is not None: - url_list.append(display_url) - - if 'media' in tweet_dict['entities']: - for item in tweet_dict['entities']['media']: - expanded_url = item.get('expanded_url', None) - display_url = item.get('display_url', None) - - if expanded_url is not None: - url_list.append(expanded_url) - if display_url is not None: - url_list.append(display_url) - - # Hashtags - if 'hashtags' in tweet_dict['entities']: - for tag in tweet_dict['entities']['hashtags']: - query = query + " " + tag['text'].encode("utf-8") - - # Screen name - if 'screen_name' in tweet_dict['user']: - query = query + " " + tweet_dict['user']['screen_name'].encode('utf-8') - - # Retweeted parts - if tweet_dict.get('retweeted_status', None) is not None: - # Status - query += " " + tweet_dict['retweeted_status']['text'].encode("utf-8") - - theFullText = tweet_dict['retweeted_status']\ - .get('quoted_status', {})\ - .get('extended_tweet', {})\ - .get('full_text', None) - if theFullText is not None: - query = " " + theFullText.encode("utf-8") - - # URLs and Media - if 'urls' in tweet_dict['retweeted_status']['entities']: - for url in tweet_dict['retweeted_status']['entities']['urls']: - expanded_url = url.get('expanded_url', None) - display_url = url.get('display_url', None) - - if expanded_url is not None: - url_list.append(expanded_url) - if display_url is not None: - url_list.append(display_url) - - if 'media' in tweet_dict['retweeted_status']['entities']: - for item in tweet_dict['retweeted_status']['entities']['media']: - expanded_url = item.get('expanded_url', None) - display_url = item.get('display_url', None) - - if expanded_url is not None: - url_list.append(expanded_url) - if display_url is not None: - url_list.append(display_url) - - # Hashtags - if 'hashtags' in tweet_dict['retweeted_status']['entities']: - for tag in tweet_dict['retweeted_status']['entities']['hashtags']: - query = query + " " + tag['text'].encode("utf-8") - - # Names - if 'in_reply_to_screen_name' in tweet_dict['retweeted_status']: - in_reply_to_screen_name = tweet_dict.get('retweeted_status', {})\ - .get('in_reply_to_screen_name', None) - if in_reply_to_screen_name is not None: - query += " " + tweet_dict['retweeted_status']['in_reply_to_screen_name'].encode('utf-8') - - if 'screen_name' in tweet_dict['retweeted_status']['user']: - screen_name = tweet_dict.get('retweeted_status', {}).get('user', {}).get('screen_name', None) - if screen_name is not None: - query += " " + tweet_dict['retweeted_status']['user']['screen_name'].encode('utf-8') - - # Quoted Status parts - if tweet_dict.get('quoted_status', None) is not None: - # Standard tweet - if tweet_dict.get('quoted_status').get('text', None) is not None: - query = query + " " + tweet_dict['quoted_status']['text'].encode('utf-8') - - # Extended tweet - if tweet_dict.get('quoted_status').get('extended_tweet', {}).get('full_text', None) is not None: - query = query + " " + tweet_dict['quoted_status']['extended_tweet']['full_text'].encode('utf-8') - - # De-dup urls and add to the giant query - if len(url_list) > 0: - url_list = set(url_list) - for url in url_list: - query = query + " " + url.encode("utf-8") - - # Lowercase the entire thing - query = query.lower() - - random.shuffle(self.redis_rules) + elif self.traptor_type == 'follow': - try: - # Shuffle the rules every once in a while - for rule in self.redis_rules: - # Get the rule to search for and lowercase it - search_str = rule['value'].encode("utf-8").lower() - - # Split the rule value and see if it's a multi-parter - part_finder = list() - search_str_multi = search_str.split(" ") - - # If there is more than one part to the rule, check for each part in the query - if len(search_str_multi) > 1: - for part in search_str_multi: - if part in query: - part_finder.append(True) - else: - part_finder.append(False) - - if len(search_str_multi) > 1 and all(part_finder): - # These two lines kept for backwards compatibility - new_dict['traptor']['rule_tag'] = rule['tag'] - new_dict['traptor']['rule_value'] = rule['value'].encode("utf-8") + searchable['userid'].add(tweet.get('user', _d).get('id_str', _s)) + searchable['userid'].add(tweet.get('retweeted_status', _d).get('user', _d).get('id_str', _s)) + searchable['userid'].add(tweet.get('quoted_status', _d).get('user', _d).get('id_str', _s)) - # Pass all key/value pairs from matched rule through to Traptor - for key, value in rule.iteritems(): - new_dict['traptor'][key] = value.encode("utf-8") + for user_mention in tweet.get('entities', _d).get('user_mentions', _l): + if 'id_str' in user_mention and user_mention['id_str'] is not None: + searchable['userid'].add(user_mention.get('id_str')) - # Log that a rule was matched - self.logger.debug('Rule matched', extra=logExtra({ - 'tweet id': tweet_dict['id_str'] - })) + for user_mention in tweet.get('extended_tweet', _d).get('entities', _d).get('user_mentions', _l): + if 'id_str' in user_mention and user_mention['id_str'] is not None: + searchable['userid'].add(user_mention.get('id_str')) - elif search_str in query: - # These two lines kept for backwards compatibility - new_dict['traptor']['rule_tag'] = rule['tag'] - new_dict['traptor']['rule_value'] = rule['value'].encode("utf-8") + for user_mention in tweet.get('retweeted_status', _d).get('extended_tweet', _d).get('entities', _d).get('user_mentions', _l): + if 'id_str' in user_mention and user_mention['id_str'] is not None: + searchable['userid'].add(user_mention.get('id_str')) - # Pass all key/value pairs from matched rule through to Traptor - for key, value in rule.iteritems(): - new_dict['traptor'][key] = value.encode("utf-8") + for user_mention in tweet.get('retweeted_status', _d).get('entities', _d).get('user_mentions', _l): + if 'id_str' in user_mention and user_mention['id_str'] is not None: + searchable['userid'].add(user_mention.get('id_str')) - # Log that a rule was matched - self.logger.debug('Rule matched', extra=logExtra({ - 'tweet id': tweet_dict['id_str'] - })) - except Exception as e: - theLogMsg = "Caught exception while performing rule matching for track" - self.logger.error(theLogMsg, extra=logExtra(e)) - dd_monitoring.increment('traptor_error_occurred', - tags=['error_type:rule_matching_failure']) + for user_mention in tweet.get('quoted_status', _d).get('extended_tweet', _d).get('entities', _d).get('user_mentions', _l): + if 'id_str' in user_mention and user_mention['id_str'] is not None: + searchable['userid'].add(user_mention.get('id_str')) - # If this is a follow Traptor, only check the user/id field of the tweet - elif self.traptor_type == 'follow': - """ - Here's how Twitter does it, and so shall we: + for user_mention in tweet.get('quoted_status', _d).get('entities', _d).get('user_mentions', _l): + if 'id_str' in user_mention and user_mention['id_str'] is not None: + searchable['userid'].add(user_mention.get('id_str')) - Tweets created by the user. - Tweets which are retweeted by the user. - Replies to any Tweet created by the user. - Retweets of any Tweet created by the user. - Manual replies, created without pressing a reply button (e.g. “@twitterapi I agree”). - """ + searchable['userid'].remove(_s) - # Build up the query from our tweet fields - query = "" + searchable['username'].add(tweet.get('user', _d).get('screen_name', _s).lower()) + searchable['username'].add(tweet.get('retweeted_status', _d).get('user', _d).get('screen_name', _s).lower()) + searchable['username'].add(tweet.get('quoted_status', _d).get('user', _d).get('screen_name', _s).lower()) - # Tweets created by the user AND - # Tweets which are retweeted by the user + for user_mention in tweet.get('entities', _d).get('user_mentions', _l): + if 'screen_name' in user_mention and user_mention['screen_name'] is not None: + searchable['username'].add(user_mention.get('screen_name').lower()) - try: - theLogMsg = 'tweet_dict for rule match' - self.logger.debug(theLogMsg, extra=logExtra({ - 'tweet_dict': json.dumps(tweet_dict).encode("utf-8") - })) - except Exception as e: - theLogMsg = "Unable to dump the tweet dict to json" - self.logger.error(theLogMsg, extra=logExtra(e)) - dd_monitoring.increment('traptor_error_occurred', - tags=['error_type:json_dumps']) + for user_mention in tweet.get('extended_tweet', _d).get('entities', _d).get('user_mentions', _l): + if 'screen_name' in user_mention and user_mention['screen_name'] is not None: + searchable['username'].add(user_mention.get('screen_name').lower()) + + for user_mention in tweet.get('retweeted_status', _d).get('extended_tweet', _d).get('entities', _d).get('user_mentions', _l): + if 'screen_name' in user_mention and user_mention['screen_name'] is not None: + searchable['username'].add(user_mention.get('screen_name').lower()) - # From this user - query += str(tweet_dict['user']['id_str']) + for user_mention in tweet.get('retweeted_status', _d).get('entities', _d).get('user_mentions', _l): + if 'screen_name' in user_mention and user_mention['screen_name'] is not None: + searchable['username'].add(user_mention.get('screen_name').lower()) - # Replies to any Tweet created by the user. - if tweet_dict['in_reply_to_user_id'] is not None \ - and tweet_dict['in_reply_to_user_id'] != '': - query += str(tweet_dict['in_reply_to_user_id']) + for user_mention in tweet.get('quoted_status', _d).get('extended_tweet', _d).get('entities', _d).get('user_mentions', _l): + if 'screen_name' in user_mention and user_mention['screen_name'] is not None: + searchable['username'].add(user_mention.get('screen_name').lower()) - # User mentions - if 'user_mentions' in tweet_dict['entities']: - for tag in tweet_dict['entities']['user_mentions']: - id_str = tag.get('id_str') - if id_str: - query = query + " " + id_str.encode("utf-8") + for user_mention in tweet.get('quoted_status', _d).get('entities', _d).get('user_mentions', _l): + if 'screen_name' in user_mention and user_mention['screen_name'] is not None: + searchable['username'].add(user_mention.get('screen_name').lower()) - # Retweeted parts - if tweet_dict.get('retweeted_status', None) is not None: - if tweet_dict['retweeted_status'].get('user', {}).get('id_str', None) is not None: - query += str(tweet_dict['retweeted_status']['user']['id_str']) + searchable['username'].remove(_s) - # Retweets of any Tweet created by the user; AND - # Manual replies, created without pressing a reply button (e.g. “@twitterapi I agree”). - query = query + tweet_dict['text'].encode("utf-8") + return searchable - # Lowercase the entire thing - query = query.lower() + def _find_rule_matches(self, tweet_dict): + """ + Best effort search for rule matches for the tweet. - random.shuffle(self.redis_rules) + :param tweet_dict: The dictionary twitter object. + :type tweet_dict: dict + :returns: a dict with the augmented data fields. + :rtype: dict + """ + new_dict = tweet_dict + self.logger.debug('Finding tweet rule matches') + + collection_rules = list() + tweet_dict['traptor']['collection_rules'] = collection_rules + # If the Traptor is a geo traptor, return the one rule we've already set up + if self.traptor_type == 'locations': + rule = copy.deepcopy(self.locations_rule) + collection_rules.append(rule) + + # Do track and follow Traptor rule tagging... + elif self.traptor_type == 'track' or self.traptor_type == 'follow': try: + content = self._build_match_content(tweet_dict) + for rule in self.redis_rules: - # Get the rule to search for and lowercase it - search_str = str(rule['value']).encode("utf-8").lower() - self.logger.debug('rule matching', extra=logExtra({ - 'dbg-search': search_str, - 'dbg-query': query - })) + rule_type = rule.get('orig_type') + rule_value = rule.get('value').encode("utf-8").lower() + value_terms = rule_value.split(u" ") + matches = list() + + for search_term in value_terms: + if rule_type in content: + matches.append(search_term in content[rule_type]) + else: + pass + + if len(matches) >=1 and all(matches): - if search_str in query: # These two lines kept for backwards compatibility - new_dict['traptor']['rule_tag'] = rule['tag'] - new_dict['traptor']['rule_value'] = rule['value'].encode("utf-8") + match = dict() + match['rule_tag'] = rule.get('tag') + match['rule_value'] = rule.get('value').encode("utf-8") - # Pass all key/value pairs from matched rule through to Traptor - for key, value in rule.iteritems(): - new_dict['traptor'][key] = value.encode("utf-8") + # Copy all keys of the rule + for key, value in rule.items(): + match[key] = value.encode("utf-8") + + collection_rules.append(match) # Log that a rule was matched - self.logger.debug('rule matched', extra=logExtra({ - 'tweet id': tweet_dict['id_str'] + self.logger.debug('Rule matched', extra=logExtra({ + 'tweet id': tweet_dict.get('id_str'), + 'rule_value': rule_value, + 'rule_id': rule.get('rule_id') })) + except Exception as e: - theLogMsg = "Caught exception while performing rule matching for follow" + theLogMsg = "Caught exception while performing rule matching for " + self.traptor_type self.logger.error(theLogMsg, extra=logExtra(e)) dd_monitoring.increment('traptor_error_occurred', tags=['error_type:rule_matching_failure']) - # unknown traptor type else: - self.logger.warning("Ran into an unknown Traptor type...") + self.logger.error("Ran into an unknown Traptor type...", + extra=logExtra({})) - if 'rule_tag' not in new_dict['traptor']: + if len(collection_rules) == 0: new_dict['traptor']['rule_type'] = self.traptor_type new_dict['traptor']['id'] = int(self.traptor_id) new_dict['traptor']['rule_tag'] = 'Not Found' new_dict['traptor']['rule_value'] = 'Not Found' # Log that a rule was matched self.logger.warning("No rule matched for tweet", extra=logExtra({ - 'tweet_id': tweet_dict['id_str'] + 'tweet_id': tweet_dict['id_str'] })) return new_dict @@ -1313,7 +1255,12 @@ def _log_rates(self): def _is_filtered(self, enriched_data): # set of keys -> rule_values rule_values = set() - rule_values.add(enriched_data['traptor']['rule_value']) # TODO look for more rule values + + if 'collection_rules' in enriched_data['traptor']: + for rule in enriched_data['traptor']['collection_rules']: + if 'value' in rule: + rule_values.add(rule.get('value')) + filtered = True for key in rule_values: if key not in self.rate_limiter: @@ -1324,7 +1271,7 @@ def _is_filtered(self, enriched_data): if key not in self.twitter_rate: self.twitter_rate[key] = deque() self.twitter_rate[key].append(time.time()) - self.rule_last_seen[key] = time.time() + if self.rate_limiter[key].consume(key): if key not in self.kafka_rate: self.kafka_rate[key] = deque() From 99d918878f8d888df372070479d0b57fe4bd32c9 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Mon, 22 Mar 2021 11:21:25 -0400 Subject: [PATCH 03/14] Fixed unit tests for ne rule matching --- tests/data/extended_tweets/follow_rules.json | 2 +- tests/data/extended_tweets/track_rules.json | 2 +- tests/data/follow_rules.json | 2 +- tests/data/locations_rules.json | 2 +- tests/data/track_rules.json | 2 +- tests/test_traptor_offline.py | 71 ++++++++++---------- traptor/dd_monitoring.py | 5 +- traptor/settings.py | 2 +- traptor/traptor.py | 22 +++--- 9 files changed, 58 insertions(+), 52 deletions(-) diff --git a/tests/data/extended_tweets/follow_rules.json b/tests/data/extended_tweets/follow_rules.json index 5b4396c..17e2af9 100644 --- a/tests/data/extended_tweets/follow_rules.json +++ b/tests/data/extended_tweets/follow_rules.json @@ -1 +1 @@ -{"tag": "test", "value": "735369652956766200", "status": "active", "description": "Tweets from some user", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "rule_type": "follow", "rule_id": 12345} +{"tag": "test", "value": "735369652956766200", "status": "active", "description": "Tweets from some user", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "orig_type": "userid", "rule_type": "follow", "rule_id": 12345} diff --git a/tests/data/extended_tweets/track_rules.json b/tests/data/extended_tweets/track_rules.json index 8efa95f..59c98e1 100644 --- a/tests/data/extended_tweets/track_rules.json +++ b/tests/data/extended_tweets/track_rules.json @@ -1 +1 @@ -{"tag": "test", "value": "tweet", "status": "active", "description": "Tweets for a hashtag", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "rule_type": "track", "rule_id": 12347} +{"tag": "test", "value": "tweet", "status": "active", "description": "Tweets for a hashtag", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "orig_type": "keyword", "rule_type": "track", "rule_id": 12347} diff --git a/tests/data/follow_rules.json b/tests/data/follow_rules.json index 01901c0..e0c17fb 100644 --- a/tests/data/follow_rules.json +++ b/tests/data/follow_rules.json @@ -1 +1 @@ -{"tag": "test", "value": "17919972", "status": "active", "description": "Tweets from some user", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "rule_type": "follow", "rule_id": 12345} +{"tag": "test", "value": "17919972", "status": "active", "description": "Tweets from some user", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "orig_type": "userid", "rule_type": "follow", "rule_id": 12345} diff --git a/tests/data/locations_rules.json b/tests/data/locations_rules.json index ac0e630..a78b7bb 100644 --- a/tests/data/locations_rules.json +++ b/tests/data/locations_rules.json @@ -1 +1 @@ -{"tag": "test", "value": "-122.75,36.8,-121.75,37.8", "status": "active", "description": "Tweets from some continent", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "rule_type": "locations", "rule_id": 12346} \ No newline at end of file +{"tag": "test", "value": "-122.75,36.8,-121.75,37.8", "status": "active", "description": "Tweets from some continent", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "orig_type": "geo", "rule_type": "locations", "rule_id": 12346} \ No newline at end of file diff --git a/tests/data/track_rules.json b/tests/data/track_rules.json index 37e947b..58b493f 100644 --- a/tests/data/track_rules.json +++ b/tests/data/track_rules.json @@ -1 +1 @@ -{"tag": "test", "value": "happy", "status": "active", "description": "Tweets for a hashtag", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "rule_type": "track", "rule_id": 12347} +{"tag": "test", "value": "happy", "status": "active", "description": "Tweets for a hashtag", "appid": "test-appid", "date_added": "2016-05-10 16:58:34", "orig_type": "keyword", "rule_type": "track", "rule_id": 12347} diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index 66142d7..8b4e87b 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -200,20 +200,20 @@ def test_track(self, redis_rules): """Test retrieving the tracking rules.""" assert {'tag': 'test', 'value': 'happy', 'status': 'active', 'description': 'Tweets for a hashtag', 'appid': 'test-appid', 'date_added': '2016-05-10 16:58:34', - 'rule_type': 'track', 'rule_id': '12347'} == redis_rules.hgetall('traptor-track:0:0') + 'rule_type': 'track', 'orig_type': 'keyword', 'rule_id': '12347'} == redis_rules.hgetall('traptor-track:0:0') def test_follow(self, redis_rules): """Test retrieving the follow rules.""" assert {'tag': 'test', 'value': '17919972', 'status': 'active', 'description': 'Tweets from some user', 'appid': 'test-appid', 'date_added': '2016-05-10 16:58:34', - 'rule_type': 'follow', 'rule_id': '12345'} == redis_rules.hgetall('traptor-follow:0:0') + 'rule_type': 'follow', 'orig_type': 'userid', 'rule_id': '12345'} == redis_rules.hgetall('traptor-follow:0:0') def test_locations(self, redis_rules): """Test retrieving the location rules.""" assert {'tag': 'test', 'value': '-122.75,36.8,-121.75,37.8', 'status': 'active', 'description': 'Tweets from some continent', 'appid': 'test-appid', 'date_added': '2016-05-10 16:58:34', - 'rule_type': 'locations', 'rule_id': '12346'} == redis_rules.hgetall('traptor-locations:0:0') + 'rule_type': 'locations', 'orig_type': 'geo', 'rule_id': '12346'} == redis_rules.hgetall('traptor-locations:0:0') class TestTraptor(object): @@ -271,17 +271,17 @@ def test_redis_rules(self, redis_rules, traptor): assert traptor.redis_rules == [{'tag': 'test', 'value': 'happy', 'status': 'active', 'description': 'Tweets for a hashtag', 'appid': 'test-appid', 'date_added': '2016-05-10 16:58:34', 'rule_type': 'track', - 'rule_id': '12347'}] + 'orig_type': 'keyword', 'rule_id': '12347'}] if traptor.traptor_type == 'follow': assert traptor.redis_rules == [{'tag': 'test', 'value': '17919972', 'status': 'active', 'description': 'Tweets from some user', 'appid': 'test-appid', 'date_added': '2016-05-10 16:58:34', 'rule_type': 'follow', - 'rule_id': '12345'}] + 'orig_type': 'userid', 'rule_id': '12345'}] if traptor.traptor_type == 'locations': assert traptor.redis_rules == [{'tag': 'test', 'value': '-122.75,36.8,-121.75,37.8', 'status': 'active', 'description': 'Tweets from some continent', 'appid': 'test-appid', 'date_added': '2016-05-10 16:58:34', 'rule_type': 'locations', - 'rule_id': '12346'}] + 'orig_type': 'geo', 'rule_id': '12346'}] def test_twitter_rules(self, redis_rules, traptor): """Ensure Traptor can create Twitter rules from the Redis rules.""" @@ -345,7 +345,8 @@ def test_ensure_traptor_builds_the_correct_filter_string(self, traptor): { "rule_id": "1", "value": "happy", - "rule_type": "track" + "rule_type": "track", + "orig_type": "keyword" }, { "rule_id": "2", @@ -509,36 +510,36 @@ def test_main_loop(self, redis_rules, traptor, tweets): # This is actually testing the rule matching if traptor.traptor_type == 'track': assert enriched_data['traptor']['created_at_iso'] == '2016-02-22T01:34:53+00:00' - assert enriched_data['traptor']['rule_tag'] == 'test' - assert enriched_data['traptor']['rule_value'] == 'happy' - assert enriched_data['traptor']['rule_type'] == 'track' - assert enriched_data['traptor']['tag'] == 'test' - assert enriched_data['traptor']['value'] == 'happy' - assert enriched_data['traptor']['status'] == 'active' - assert enriched_data['traptor']['description'] == 'Tweets for a hashtag' - assert enriched_data['traptor']['appid'] == 'test-appid' + assert enriched_data['traptor']['collection_rules'][0]['rule_tag'] == 'test' + assert enriched_data['traptor']['collection_rules'][0]['rule_value'] == 'happy' + assert enriched_data['traptor']['collection_rules'][0]['rule_type'] == 'track' + assert enriched_data['traptor']['collection_rules'][0]['tag'] == 'test' + assert enriched_data['traptor']['collection_rules'][0]['value'] == 'happy' + assert enriched_data['traptor']['collection_rules'][0]['status'] == 'active' + assert enriched_data['traptor']['collection_rules'][0]['description'] == 'Tweets for a hashtag' + assert enriched_data['traptor']['collection_rules'][0]['appid'] == 'test-appid' if traptor.traptor_type == 'follow': assert enriched_data['traptor']['created_at_iso'] == '2016-02-20T03:52:59+00:00' - assert enriched_data['traptor']['rule_tag'] == 'test' - assert enriched_data['traptor']['rule_value'] == '17919972' - assert enriched_data['traptor']['rule_type'] == 'follow' - assert enriched_data['traptor']['tag'] == 'test' - assert enriched_data['traptor']['value'] == '17919972' - assert enriched_data['traptor']['status'] == 'active' - assert enriched_data['traptor']['description'] == 'Tweets from some user' - assert enriched_data['traptor']['appid'] == 'test-appid' + assert enriched_data['traptor']['collection_rules'][0]['rule_tag'] == 'test' + assert enriched_data['traptor']['collection_rules'][0]['rule_value'] == '17919972' + assert enriched_data['traptor']['collection_rules'][0]['rule_type'] == 'follow' + assert enriched_data['traptor']['collection_rules'][0]['tag'] == 'test' + assert enriched_data['traptor']['collection_rules'][0]['value'] == '17919972' + assert enriched_data['traptor']['collection_rules'][0]['status'] == 'active' + assert enriched_data['traptor']['collection_rules'][0]['description'] == 'Tweets from some user' + assert enriched_data['traptor']['collection_rules'][0]['appid'] == 'test-appid' if traptor.traptor_type == 'locations': assert enriched_data['traptor']['created_at_iso'] == '2016-02-23T02:02:54+00:00' - assert enriched_data['traptor']['rule_tag'] == 'test' - assert enriched_data['traptor']['rule_value'] == '-122.75,36.8,-121.75,37.8' - assert enriched_data['traptor']['rule_type'] == 'locations' - assert enriched_data['traptor']['tag'] == 'test' - assert enriched_data['traptor']['value'] == '-122.75,36.8,-121.75,37.8' - assert enriched_data['traptor']['status'] == 'active' - assert enriched_data['traptor']['description'] == 'Tweets from some continent' - assert enriched_data['traptor']['appid'] == 'test-appid' + assert enriched_data['traptor']['collection_rules'][0]['rule_tag'] == 'test' + assert enriched_data['traptor']['collection_rules'][0]['rule_value'] == '-122.75,36.8,-121.75,37.8' + assert enriched_data['traptor']['collection_rules'][0]['rule_type'] == 'locations' + assert enriched_data['traptor']['collection_rules'][0]['tag'] == 'test' + assert enriched_data['traptor']['collection_rules'][0]['value'] == '-122.75,36.8,-121.75,37.8' + assert enriched_data['traptor']['collection_rules'][0]['status'] == 'active' + assert enriched_data['traptor']['collection_rules'][0]['description'] == 'Tweets from some continent' + assert enriched_data['traptor']['collection_rules'][0]['appid'] == 'test-appid' @pytest.mark.extended def test_main_loop_extended(self, redis_rules, traptor, extended_tweets): @@ -560,7 +561,7 @@ def test_main_loop_extended(self, redis_rules, traptor, extended_tweets): # Do the rule matching against the redis rules enriched_data = traptor._enrich_tweet(tweet) - assert enriched_data['traptor']['rule_value'] == '735369652956766200' + assert enriched_data['traptor']['collection_rules'][0]['rule_value'] == '735369652956766200' if traptor.traptor_type == 'track': with open('tests/data/extended_tweets/track_rules.json') as f: @@ -570,7 +571,7 @@ def test_main_loop_extended(self, redis_rules, traptor, extended_tweets): # Do the rule matching against the redis rules enriched_data = traptor._enrich_tweet(tweet) - assert enriched_data['traptor']['rule_value'] == 'tweet' + assert enriched_data['traptor']['collection_rules'][0]['rule_value'] == 'tweet' def test_invalid_follow_returns_blank(self, traptor): traptor.logger = MagicMock() @@ -679,7 +680,7 @@ def test_is_filtered_dummy(self, traptor): # thread interval every 2 minutes or function in traptor every 10,000 tweets # function filter_maintance - for key, value in rule_last_seen: + for key, value in rule_last_seen.items(): if upper_bound >= value >= lower_bound: print('Last seen less than 2 minutes ago') @@ -707,4 +708,4 @@ def test_is_filtered_dummy(self, traptor): for i in range(100): traptor._is_filtered(enriched_data) # number of trues match behavior expected - assert False + #assert False diff --git a/traptor/dd_monitoring.py b/traptor/dd_monitoring.py index 5b81223..c2471d1 100644 --- a/traptor/dd_monitoring.py +++ b/traptor/dd_monitoring.py @@ -4,6 +4,7 @@ ############################################################################### import os from datadog import statsd, initialize +import settings traptor_type = os.getenv('TRAPTOR_TYPE', 'track') traptor_id = os.getenv('TRAPTOR_ID', '0') @@ -17,7 +18,9 @@ options = { 'statsd_host': os.getenv('STATSD_HOST_IP', 'statsd') } -initialize(**options) + +if settings.DW_ENABLED: + initialize(**options) # CONST metric names and their actual dd value. DATADOG_METRICS = { diff --git a/traptor/settings.py b/traptor/settings.py index 3c1f4f7..a5a9cb7 100644 --- a/traptor/settings.py +++ b/traptor/settings.py @@ -50,7 +50,7 @@ RATE_LIMITING_ENABLED = bool(os.getenv('RATE_LIMITING_ENABLED', 'False') == 'True') RATE_LIMITING_RATE_SEC = max(1.0, float(os.getenv('RATE_LIMITING_RATE_SEC', 10.0))) RATE_LIMITING_CAPACITY = max(1, int(os.getenv('RATE_LIMITING_CAPACITY', 10))) -RATE_LIMITING_REPORTING_INTERVAL_SEC = max(1.0, float(os.getenv('RATE_LIMITING_REPORTING_INTERVAL_SEC'))) +RATE_LIMITING_REPORTING_INTERVAL_SEC = max(1.0, float(os.getenv('RATE_LIMITING_REPORTING_INTERVAL_SEC', 120.0))) # Manager API diff --git a/traptor/traptor.py b/traptor/traptor.py index 6745a58..1b24a2e 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -744,7 +744,7 @@ def _build_match_content(self, tweet): if self.traptor_type == 'track': - free_text = {tweet.get('value', _s), + free_text = {tweet.get('text', _s), tweet.get('quoted_status', _d).get('extended_tweet', _d).get('full_text', _s), tweet.get('quoted_status', _d).get('text', _s), tweet.get('retweeted_status', _d).get('extended_tweet', _d).get('full_text', _s), @@ -764,6 +764,8 @@ def _build_match_content(self, tweet): self._get_url_fields(tweet.get('quoted_status', _d).get('extended_tweet', _d).get('entities', _d).get('media', _l), free_text) self._get_url_fields(tweet.get('quoted_status', _d).get('entities', _d).get('media', _l), free_text) + if _s in free_text: + free_text.remove(_s) searchable['keyword'] = u" ".join(free_text).lower() for hashtag in tweet.get('extended_tweet', _d).get('entities', _d).get('hashtags', _l): @@ -790,7 +792,8 @@ def _build_match_content(self, tweet): if 'text' in hashtag and hashtag['text'] is not None: searchable['hashtag'].add(hashtag.get('text').lower()) - searchable['hashtag'].remove(_s) + if _s in searchable['hashtag']: + searchable['hashtag'].remove(_s) elif self.traptor_type == 'follow': @@ -822,7 +825,8 @@ def _build_match_content(self, tweet): if 'id_str' in user_mention and user_mention['id_str'] is not None: searchable['userid'].add(user_mention.get('id_str')) - searchable['userid'].remove(_s) + if _s in searchable['userid']: + searchable['userid'].remove(_s) searchable['username'].add(tweet.get('user', _d).get('screen_name', _s).lower()) searchable['username'].add(tweet.get('retweeted_status', _d).get('user', _d).get('screen_name', _s).lower()) @@ -852,7 +856,8 @@ def _build_match_content(self, tweet): if 'screen_name' in user_mention and user_mention['screen_name'] is not None: searchable['username'].add(user_mention.get('screen_name').lower()) - searchable['username'].remove(_s) + if _s in searchable['username']: + searchable['username'].remove(_s) return searchable @@ -896,14 +901,11 @@ def _find_rule_matches(self, tweet_dict): if len(matches) >=1 and all(matches): + match = copy.deepcopy(rule) + # These two lines kept for backwards compatibility - match = dict() match['rule_tag'] = rule.get('tag') - match['rule_value'] = rule.get('value').encode("utf-8") - - # Copy all keys of the rule - for key, value in rule.items(): - match[key] = value.encode("utf-8") + match['rule_value'] = rule.get('value') collection_rules.append(match) From 388f7fd617a3bfffce70d93b884a2cef5e73a62e Mon Sep 17 00:00:00 2001 From: pklene96 Date: Mon, 22 Mar 2021 16:36:16 -0400 Subject: [PATCH 04/14] Updated unit test, still need more work done --- sample_docker_environment_file.env | 22 ----------- tests/test_traptor_offline.py | 54 ++++++++++++++++++++++----- traptor/settings.py | 4 ++ traptor/traptor.py | 60 +++++++++++++++++++++--------- 4 files changed, 90 insertions(+), 50 deletions(-) delete mode 100644 sample_docker_environment_file.env diff --git a/sample_docker_environment_file.env b/sample_docker_environment_file.env deleted file mode 100644 index 869b252..0000000 --- a/sample_docker_environment_file.env +++ /dev/null @@ -1,22 +0,0 @@ -# 1 - Rename this file to traptor_envs.env so it will be picked up on by docker compose -# 2 - Fill in all relevant details, especially the Twitter API key values at the bottom -TRAPTOR_TYPE=track -TRAPTOR_ID=0 -RULE_CHECK_INTERVAL=60 -USE_SENTRY='false' -SENTRY_URL=https://www.google.com -LOG_LEVEL=INFO -LOG_DIR=/var/log/traptor -LOG_FILE=traptor.log -KAFKA_ENABLED=true -KAFKA_HOSTS=kafka:9092 -KAFKA_TOPIC=traptor.testing -REDIS_HOST=redis -REDIS_PORT=6379 -REDIS_DB=5 -REDIS_PUBSUB_CHANNEL=traptor-test -CONSUMER_KEY=FILL_THIS_IN -CONSUMER_SECRET=FILL_THIS_IN -ACCESS_TOKEN=FILL_THIS_IN -ACCESS_TOKEN_SECRET=FILL_THIS_IN -STATSD_HOST_IP=172.17.0.1 diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index 8b4e87b..c968ae4 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -1,14 +1,16 @@ """Traptor unit tests.""" # To run with autotest and coverage and print all output to console run: # py.test -s --cov=traptor --looponfail tests/ +from collections import deque from datetime import datetime, timedelta +import time import os import json import token_bucket from redis import StrictRedis, ConnectionError import pytest -from mock import MagicMock +from mock import MagicMock, call import mockredis from tenacity import wait_none @@ -640,6 +642,40 @@ def test_token_bucket(self): import time time.sleep(.05) + def test_rate_logger(self, traptor): + traptor.kafka_rate['test'] = deque([1, 2, 3]) + traptor.twitter_rate['test'] = deque([1, 2, 3]) + traptor.logger = MagicMock() + traptor._log_rates() + assert traptor.logger.method_calls ==[call.info('Twitter Rate', extra={'tps': 1, 'rule_value': 'test', 'component': 'traptor', 'traptor_version': '4.0.6.2', 'tags': ['traptor_type:None', 'traptor_id:None']}), + call.info('Kafka Rate', extra={'tps': 1, 'rule_value': 'test', 'component': 'traptor', 'traptor_version': '4.0.6.2', 'tags': ['traptor_type:None', 'traptor_id:None']})] + + def test_rate_logger_out_of_range(self, traptor): + traptor.kafka_rate['test'] = deque() + traptor.twitter_rate['test'] = deque() + traptor.logger = MagicMock() + traptor._log_rates() + assert traptor.logger.method_calls == [] + + def test_rate_logger_length_one(self, traptor): + traptor.kafka_rate['test'] = deque([2]) + traptor.twitter_rate['test'] = deque([2]) + traptor.logger = MagicMock() + traptor._log_rates() + assert traptor.logger.method_calls == [call.info('Twitter Rate', extra={'tps': 1, 'rule_value': 'test', 'component': 'traptor', 'traptor_version': '4.0.6.2','tags': ['traptor_type:None', 'traptor_id:None']}), + call.info('Kafka Rate', extra={'tps': 1, 'rule_value': 'test', 'component': 'traptor', 'traptor_version': '4.0.6.2', 'tags': ['traptor_type:None', 'traptor_id:None']})] + + def test_filter_maintenance(self, traptor): + now = time.time() + traptor.twitter_rate['test'] = deque([time.time() - 360, time.time() - 240, time.time() - 120, now]) + traptor.kafka_rate['test'] = deque([time.time() - 360, time.time() - 240, time.time() - 120, now]) + + traptor._filter_maintenance() + + assert traptor.twitter_rate['test'] == deque([now]) + assert traptor.kafka_rate['test'] == deque([now]) + traptor._filter_maintenance() + def test_is_filtered_one_rule_value(self, traptor): enriched_data = { "traptor": { @@ -647,14 +683,11 @@ def test_is_filtered_one_rule_value(self, traptor): } } for i in range(100): - if not traptor._is_filtered(enriched_data): - print("Write to Kafka") - else: - print("Filter") - import time + traptor._is_filtered(enriched_data) time.sleep(.01) - print(len(traptor.twitter_rate["air force"])) - print(len(traptor.kafka_rate["air force"])) + + assert len(traptor.twitter_rate['air force']) == 100 + assert len(traptor.kafka_rate['air force']) == 21 def test_is_filtered_dummy(self, traptor): enriched_data = { @@ -680,8 +713,9 @@ def test_is_filtered_dummy(self, traptor): # thread interval every 2 minutes or function in traptor every 10,000 tweets # function filter_maintance - for key, value in rule_last_seen.items(): + + for key, value in rule_last_seen: if upper_bound >= value >= lower_bound: print('Last seen less than 2 minutes ago') print(rule_last_seen) @@ -708,4 +742,4 @@ def test_is_filtered_dummy(self, traptor): for i in range(100): traptor._is_filtered(enriched_data) # number of trues match behavior expected - #assert False + diff --git a/traptor/settings.py b/traptor/settings.py index a5a9cb7..d4c737a 100644 --- a/traptor/settings.py +++ b/traptor/settings.py @@ -50,8 +50,12 @@ RATE_LIMITING_ENABLED = bool(os.getenv('RATE_LIMITING_ENABLED', 'False') == 'True') RATE_LIMITING_RATE_SEC = max(1.0, float(os.getenv('RATE_LIMITING_RATE_SEC', 10.0))) RATE_LIMITING_CAPACITY = max(1, int(os.getenv('RATE_LIMITING_CAPACITY', 10))) + RATE_LIMITING_REPORTING_INTERVAL_SEC = max(1.0, float(os.getenv('RATE_LIMITING_REPORTING_INTERVAL_SEC', 120.0))) +RATE_LIMITING_REPORTING_INTERVAL_SEC = max(1.0, float(os.getenv('RATE_LIMITING_REPORTING_INTERVAL_SEC', 1))) + + # Manager API API_PORT = os.getenv('API_PORT', 5000) diff --git a/traptor/traptor.py b/traptor/traptor.py index 1b24a2e..e796eab 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -1229,30 +1229,55 @@ def _send_enriched_data_to_kafka(self, tweet, enriched_data): def _filter_maintenance(self, expiration_age_sec=120): expiration_time = time.time() - expiration_age_sec - for key, value in self.twitter_rate: + for key, value in self.twitter_rate.items(): if value[-1] <= expiration_time: del self.kafka_rate[key], self.rate_limiter[key], self.twitter_rate[key] + continue while value and value[0] <= expiration_time: value.popleft() - for key, value in self.kafka_rate: + for key, value in self.kafka_rate.items(): while value and value[0] <= expiration_time: value.popleft() - def _log_rates(self): - for key, value in self.twitter_rate: - # Edge cases - tps = len(value) / (value[-1] - value[0]) #edge cases - self.logger.info("Twitter Rate", extra=logExtra({ - 'rule_value': key, - 'tps': tps - })) - for key, value in self.kafka_rate: + # add parameter for time range ev + # Average tps and unit test + + def _log_rates(self, evaluation_window): + for key, value in self.twitter_rate.items(): # Edge cases - tps = len(value) / (value[-1] - value[0]) #edge cases - self.logger.info("Kafka Rate", extra=logExtra({ - 'rule_value': key, - 'tps': tps - })) + # IndexError: deque index out of range if value length is 0 + # ZeroDivisionError: integer division or modulo by zero: if only one item in value + # ZeroDivisionError: integer division or modulo by zero: if value[-1] and value[0] are equal + if len(value) == 1: + tps = len(value)/evaluation_window # evaluation window + self.logger.info("Twitter Rate", extra=logExtra({ + 'rule_value': key, + 'average_tps': tps + })) + elif len(value) > 0: + # Max min or mean? + # still haven't figured out the min or max + #first_window = time.time() - evaluation_window + + average_tps = len(value) / evaluation_window + self.logger.info("Twitter Rate", extra=logExtra({ + 'rule_value': key, + 'average_tps': average_tps + })) + + for key, value in self.kafka_rate.items(): + if len(value) == 1: + tps = len(value) + self.logger.info("Kafka Rate", extra=logExtra({ + 'rule_value': key, + 'tps': tps + })) + elif len(value) > 0: + tps = len(value) / (value[-1] - value[0]) + self.logger.info("Kafka Rate", extra=logExtra({ + 'rule_value': key, + 'tps': tps + })) def _is_filtered(self, enriched_data): # set of keys -> rule_values @@ -1269,7 +1294,6 @@ def _is_filtered(self, enriched_data): storage = token_bucket.MemoryStorage() limiter = token_bucket.Limiter(settings.RATE_LIMITING_RATE_SEC, settings.RATE_LIMITING_CAPACITY, storage) self.rate_limiter[key] = limiter - # tail - head / number of elements if key not in self.twitter_rate: self.twitter_rate[key] = deque() self.twitter_rate[key].append(time.time()) @@ -1333,7 +1357,7 @@ def _main_loop(self): if time.time() > self.last_filter_maintenance + settings.RATE_LIMITING_REPORTING_INTERVAL_SEC: self.last_filter_maintenance = time.time() self._filter_maintenance(settings.RATE_LIMITING_REPORTING_INTERVAL_SEC) - self.log_tweet_rates() + self.log_tweet_rates(settings.RATE_LIMITING_REPORTING_INTERVAL_SEC) if self.exit: break From 863ab17bdfeb84f4a5b486cde872c5808ea2c4e8 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Tue, 23 Mar 2021 11:10:04 -0400 Subject: [PATCH 05/14] Logic rework away from fixed time --- tests/test_traptor_offline.py | 27 +++-- traptor/settings.py | 6 +- traptor/traptor.py | 219 ++++++++++++++++++++++++---------- 3 files changed, 176 insertions(+), 76 deletions(-) diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index c968ae4..8bf41ff 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -646,42 +646,47 @@ def test_rate_logger(self, traptor): traptor.kafka_rate['test'] = deque([1, 2, 3]) traptor.twitter_rate['test'] = deque([1, 2, 3]) traptor.logger = MagicMock() - traptor._log_rates() - assert traptor.logger.method_calls ==[call.info('Twitter Rate', extra={'tps': 1, 'rule_value': 'test', 'component': 'traptor', 'traptor_version': '4.0.6.2', 'tags': ['traptor_type:None', 'traptor_id:None']}), - call.info('Kafka Rate', extra={'tps': 1, 'rule_value': 'test', 'component': 'traptor', 'traptor_version': '4.0.6.2', 'tags': ['traptor_type:None', 'traptor_id:None']})] + traptor._log_rates(4.0, 4.0) + assert traptor.logger.method_calls ==[call.info('Twitter Rate', extra={'count': 3, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 2, 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.75, 'traptor_version': '4.0.6.2'}), + call.info('Kafka Rate', extra={'count': 3, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 2, 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.75, 'traptor_version': '4.0.6.2'})] def test_rate_logger_out_of_range(self, traptor): traptor.kafka_rate['test'] = deque() traptor.twitter_rate['test'] = deque() traptor.logger = MagicMock() - traptor._log_rates() - assert traptor.logger.method_calls == [] + traptor._log_rates(4.0, 4.0) + assert traptor.logger.method_calls == [call.info('Twitter Rate', extra={'count': 0, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 0.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.0, 'traptor_version': '4.0.6.2'}), + call.info('Kafka Rate', extra={'count': 0, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 0.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.0, 'traptor_version': '4.0.6.2'})] def test_rate_logger_length_one(self, traptor): traptor.kafka_rate['test'] = deque([2]) traptor.twitter_rate['test'] = deque([2]) traptor.logger = MagicMock() - traptor._log_rates() - assert traptor.logger.method_calls == [call.info('Twitter Rate', extra={'tps': 1, 'rule_value': 'test', 'component': 'traptor', 'traptor_version': '4.0.6.2','tags': ['traptor_type:None', 'traptor_id:None']}), - call.info('Kafka Rate', extra={'tps': 1, 'rule_value': 'test', 'component': 'traptor', 'traptor_version': '4.0.6.2', 'tags': ['traptor_type:None', 'traptor_id:None']})] + traptor._log_rates(4.0, 4.0) + assert traptor.logger.method_calls == [call.info('Twitter Rate', extra={'count': 1, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 0, 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.25, 'traptor_version': '4.0.6.2'}), + call.info('Kafka Rate', extra={'count': 1, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 0, 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.25, 'traptor_version': '4.0.6.2'})] def test_filter_maintenance(self, traptor): now = time.time() traptor.twitter_rate['test'] = deque([time.time() - 360, time.time() - 240, time.time() - 120, now]) traptor.kafka_rate['test'] = deque([time.time() - 360, time.time() - 240, time.time() - 120, now]) - traptor._filter_maintenance() + traptor._filter_maintenance(now, 30.0) assert traptor.twitter_rate['test'] == deque([now]) assert traptor.kafka_rate['test'] == deque([now]) - traptor._filter_maintenance() + traptor._filter_maintenance(now, 30.0) def test_is_filtered_one_rule_value(self, traptor): + enriched_data = { "traptor": { "collection_rules": [{"value": "air force"}] } } + + traptor.rate_limiting_enabled = True + for i in range(100): traptor._is_filtered(enriched_data) time.sleep(.01) @@ -715,7 +720,7 @@ def test_is_filtered_dummy(self, traptor): # function filter_maintance - for key, value in rule_last_seen: + for key, value in rule_last_seen.items(): if upper_bound >= value >= lower_bound: print('Last seen less than 2 minutes ago') print(rule_last_seen) diff --git a/traptor/settings.py b/traptor/settings.py index d4c737a..3ea1c71 100644 --- a/traptor/settings.py +++ b/traptor/settings.py @@ -50,11 +50,7 @@ RATE_LIMITING_ENABLED = bool(os.getenv('RATE_LIMITING_ENABLED', 'False') == 'True') RATE_LIMITING_RATE_SEC = max(1.0, float(os.getenv('RATE_LIMITING_RATE_SEC', 10.0))) RATE_LIMITING_CAPACITY = max(1, int(os.getenv('RATE_LIMITING_CAPACITY', 10))) - -RATE_LIMITING_REPORTING_INTERVAL_SEC = max(1.0, float(os.getenv('RATE_LIMITING_REPORTING_INTERVAL_SEC', 120.0))) - -RATE_LIMITING_REPORTING_INTERVAL_SEC = max(1.0, float(os.getenv('RATE_LIMITING_REPORTING_INTERVAL_SEC', 1))) - +RATE_LIMITING_REPORTING_INTERVAL_SEC = max(1.0, float(os.getenv('RATE_LIMITING_REPORTING_INTERVAL_SEC', 60.0))) # Manager API diff --git a/traptor/traptor.py b/traptor/traptor.py index e796eab..7afcf8f 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import copy import json +import math import signal import sys import os @@ -202,6 +203,10 @@ def __init__( test=False, enable_stats_collection=True, heartbeat_interval=0, + rate_limiting_enabled=False, + rate_limiting_rate_sec=10.0, + rate_limiting_capacity=10, + rate_limiting_reporting_interval_sec=60.0 ): """ Traptor base class. @@ -241,6 +246,10 @@ def __init__( self.kafka_enabled = kafka_enabled self.kafka_hosts = kafka_hosts self.kafka_topic = kafka_topic + self.rate_limiting_enabled = rate_limiting_enabled + self.rate_limiting_rate_sec = max(1.0, float(rate_limiting_rate_sec)) + self.rate_limiting_capacity = max(1, int(rate_limiting_capacity)) + self.rate_limiting_reporting_interval_sec = max(1.0, float(rate_limiting_reporting_interval_sec)) self.use_sentry = use_sentry self.sentry_url = sentry_url self.test = test @@ -278,7 +287,7 @@ def __init__( # Map of rule value -> token bucket self.rate_limiter = dict() - self.last_filter_maintenance = 0 + self._last_filter_maintenance = 0 def sigterm_handler(_signo, _stack_frame): self._exit() @@ -1227,59 +1236,122 @@ def _send_enriched_data_to_kafka(self, tweet, enriched_data): future.add_callback(self.kafka_success_callback, tweet) future.add_errback(self.kafka_failure_callback) - def _filter_maintenance(self, expiration_age_sec=120): - expiration_time = time.time() - expiration_age_sec - for key, value in self.twitter_rate.items(): - if value[-1] <= expiration_time: - del self.kafka_rate[key], self.rate_limiter[key], self.twitter_rate[key] - continue - while value and value[0] <= expiration_time: - value.popleft() + def _filter_maintenance(self, t_now=time.time(), expiration_age_sec=60.0): + """ + This examines each rule value we are tracking rates on for filtering and truncates + any data that it outside the active window for evaluation. If all rate data is outside + the window, then it means we haven't recently received traffic for that rule value and + we can stop tracking it. + :param t_now: The time at which to base maintenance from + :type t_now: float + :param expiration_age_sec: The length of time data is valid + :type expiration_age_sec: float + """ + expiration_time = t_now - expiration_age_sec + keys = list(self.twitter_rate.keys()) + + for key in keys: + value = self.twitter_rate[key] + + # If the most recent value is too old, stop tracking the value + if (value and value[-1] <= expiration_time) or not value: + if key in self.kafka_rate: + del self.kafka_rate[key] + + if key in self.rate_limiter: + del self.rate_limiter[key] + + if key in self.twitter_rate: + del self.twitter_rate[key] + else: + # Drop old entries to stay within the expiration_age_sec + while value and value[0] <= expiration_time: + value.popleft() + for key, value in self.kafka_rate.items(): while value and value[0] <= expiration_time: value.popleft() - # add parameter for time range ev - # Average tps and unit test - - def _log_rates(self, evaluation_window): - for key, value in self.twitter_rate.items(): - # Edge cases - # IndexError: deque index out of range if value length is 0 - # ZeroDivisionError: integer division or modulo by zero: if only one item in value - # ZeroDivisionError: integer division or modulo by zero: if value[-1] and value[0] are equal - if len(value) == 1: - tps = len(value)/evaluation_window # evaluation window - self.logger.info("Twitter Rate", extra=logExtra({ - 'rule_value': key, - 'average_tps': tps - })) - elif len(value) > 0: - # Max min or mean? - # still haven't figured out the min or max - #first_window = time.time() - evaluation_window - - average_tps = len(value) / evaluation_window - self.logger.info("Twitter Rate", extra=logExtra({ - 'rule_value': key, - 'average_tps': average_tps - })) + def _compute_rates(self, data, t_now, evaluation_window_sec): + """ - for key, value in self.kafka_rate.items(): - if len(value) == 1: - tps = len(value) - self.logger.info("Kafka Rate", extra=logExtra({ - 'rule_value': key, - 'tps': tps - })) - elif len(value) > 0: - tps = len(value) / (value[-1] - value[0]) - self.logger.info("Kafka Rate", extra=logExtra({ - 'rule_value': key, - 'tps': tps - })) + :param data: + :type data: dict + :param t_now: + :type t_now: float + :param evaluation_window_sec: + :type evaluation_window_sec: float + :return: + """ + rates = dict() + + if evaluation_window_sec <= 0.0: + return rates + + t_start = t_now - evaluation_window_sec + + for key, value in data.items(): + count = len(value) + average_tps = float(count) / evaluation_window_sec + + max_tps = average_tps + min_tps = average_tps + + if count > 0: + second_buckets = dict() + + for i in range(int(math.ceil(evaluation_window_sec))): + second_buckets[i + int(math.floor(t_start))] = 0 + + for timestamp in value: + key = int(timestamp - value[0]) + if key not in second_buckets: + second_buckets[key] = 0 + second_buckets[key] += 1 + + for second, occurances in second_buckets.items(): + max_tps = max(max_tps, float(occurances)) + min_tps = min(min_tps, float(occurances)) + + rates[key] = { + 'count': count, + 'max_tps': max_tps, + 'average_tps': average_tps, + 'min_tps': min_tps + } + + return rates + + def _log_rates(self, t_now, evaluation_window_sec): + """ + This computes the rate of traffic per rule value and logs the metrics. + :param t_now: The time at which to base calculations from + :type t_now: float + :param evaluation_window_sec: The length of time data is evaluated + :type evaluation_window_sec: float + """ + + for key, value in self._compute_rates(self.twitter_rate, t_now, evaluation_window_sec).items(): + + self.logger.info("Twitter Rate", extra=logExtra(dict({ + 'rule_value': key + }, **value))) + + for key, value in self._compute_rates(self.kafka_rate, t_now, evaluation_window_sec).items(): + + self.logger.info("Kafka Rate", extra=logExtra(dict({ + 'rule_value': key + }, **value))) def _is_filtered(self, enriched_data): + """ + Tracks the volume of tweets per rule value received from twitter and applies a token bucket + rate limiter to determine if we should send the tweet to kafka. + + :param enriched_data: The tweet with rule matches + :return: + """ + # set of keys -> rule_values rule_values = set() @@ -1288,22 +1360,35 @@ def _is_filtered(self, enriched_data): if 'value' in rule: rule_values.add(rule.get('value')) - filtered = True + filtered = list() + t_now = time.time() + for key in rule_values: - if key not in self.rate_limiter: + + if self.rate_limiting_enabled and key not in self.rate_limiter: + # Initialize a limiter for the untracked rule value storage = token_bucket.MemoryStorage() - limiter = token_bucket.Limiter(settings.RATE_LIMITING_RATE_SEC, settings.RATE_LIMITING_CAPACITY, storage) + limiter = token_bucket.Limiter(self.rate_limiting_rate_sec, self.rate_limiting_capacity, storage) self.rate_limiter[key] = limiter + if key not in self.twitter_rate: self.twitter_rate[key] = deque() - self.twitter_rate[key].append(time.time()) - if self.rate_limiter[key].consume(key): + self.twitter_rate[key].append(t_now) + + # Do we have enough token bucket credits (under the limit) to send the tweet? + if not self.rate_limiting_enabled or self.rate_limiter[key].consume(key): + if key not in self.kafka_rate: self.kafka_rate[key] = deque() - self.kafka_rate[key].append(time.time()) - filtered = False - return filtered + + self.kafka_rate[key].append(t_now) + filtered.append(False) + else: + filtered.append(True) + + # Ensure we don't filter tweets without any rules + return len(filtered) != 0 and all(filtered) def _main_loop(self): """ @@ -1354,10 +1439,12 @@ def _main_loop(self): else: self.logger.info("Stream keep-alive received", extra=logExtra()) - if time.time() > self.last_filter_maintenance + settings.RATE_LIMITING_REPORTING_INTERVAL_SEC: - self.last_filter_maintenance = time.time() - self._filter_maintenance(settings.RATE_LIMITING_REPORTING_INTERVAL_SEC) - self.log_tweet_rates(settings.RATE_LIMITING_REPORTING_INTERVAL_SEC) + t_now = time.time() + + if t_now > self.last_filter_maintenance + self.rate_limiting_reporting_interval_sec: + self._log_rates(t_now, t_now - self._last_filter_maintenance) + self._filter_maintenance(t_now, 0.0) + self._last_filter_maintenance = t_now if self.exit: break @@ -1700,7 +1787,19 @@ def main(): )), heartbeat_interval=int(getAppParamStr( 'HEARTBEAT_INTERVAL', '0', args.heartbeat - )) + )), + rate_limiting_enabled=str2bool(getAppParamStr( + 'RATE_LIMITING_ENABLED', settings.RATE_LIMITING_ENABLED + )), + rate_limiting_rate_sec=float(getAppParamStr( + 'RATE_LIMITING_RATE_SEC', settings.RATE_LIMITING_RATE_SEC + )), + rate_limiting_capacity=int(getAppParamStr( + 'RATE_LIMITING_CAPACITY', settings.RATE_LIMITING_CAPACITY + )), + rate_limiting_reporting_interval_sec=float(getAppParamStr( + 'RATE_LIMITING_REPORTING_INTERVAL_SEC', settings.RATE_LIMITING_REPORTING_INTERVAL_SEC + )), ) # Ensure we setup our CONSTS before we start actually doing things with threads From c032704c2586dd35a89a376a6fe4b88212282c57 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Tue, 23 Mar 2021 15:13:52 -0400 Subject: [PATCH 06/14] bugs --- traptor/traptor.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/traptor/traptor.py b/traptor/traptor.py index 7afcf8f..b5264d0 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -287,7 +287,7 @@ def __init__( # Map of rule value -> token bucket self.rate_limiter = dict() - self._last_filter_maintenance = 0 + self._last_filter_maintenance = -1 def sigterm_handler(_signo, _stack_frame): self._exit() @@ -1230,11 +1230,15 @@ def _send_enriched_data_to_kafka(self, tweet, enriched_data): """ theLogMsg = "Attempting to send tweet to kafka" self.logger.info(theLogMsg, extra=logExtra({ - 'tweet_id': tweet.get('id_str', None) + 'tweet_id': tweet.get('id_str', None) })) - future = self.kafka_conn.send(self.kafka_topic, enriched_data) - future.add_callback(self.kafka_success_callback, tweet) - future.add_errback(self.kafka_failure_callback) + + try: + future = self.kafka_conn.send(self.kafka_topic, enriched_data) + future.add_callback(self.kafka_success_callback, tweet) + future.add_errback(self.kafka_failure_callback) + except Exception as e: + self.logger.error("Kafka failed", extra=logExtra(e)) def _filter_maintenance(self, t_now=time.time(), expiration_age_sec=60.0): """ @@ -1288,8 +1292,6 @@ def _compute_rates(self, data, t_now, evaluation_window_sec): if evaluation_window_sec <= 0.0: return rates - t_start = t_now - evaluation_window_sec - for key, value in data.items(): count = len(value) average_tps = float(count) / evaluation_window_sec @@ -1301,13 +1303,13 @@ def _compute_rates(self, data, t_now, evaluation_window_sec): second_buckets = dict() for i in range(int(math.ceil(evaluation_window_sec))): - second_buckets[i + int(math.floor(t_start))] = 0 + second_buckets[i] = 0 for timestamp in value: - key = int(timestamp - value[0]) - if key not in second_buckets: - second_buckets[key] = 0 - second_buckets[key] += 1 + second = int(timestamp - value[0]) + if second not in second_buckets: + second_buckets[second] = 0 + second_buckets[second] += 1 for second, occurances in second_buckets.items(): max_tps = max(max_tps, float(occurances)) @@ -1412,7 +1414,7 @@ def _main_loop(self): tags=['error_type:json_loads_error']) else: theLogMsg = "Enriching Tweet" - self.logger.info(theLogMsg, extra=logExtra({ + self.logger.debug(theLogMsg, extra=logExtra({ 'tweet_id': tweet.get('id_str', None) })) enriched_data = self._enrich_tweet(tweet) @@ -1441,8 +1443,8 @@ def _main_loop(self): t_now = time.time() - if t_now > self.last_filter_maintenance + self.rate_limiting_reporting_interval_sec: - self._log_rates(t_now, t_now - self._last_filter_maintenance) + if t_now > self._last_filter_maintenance + self.rate_limiting_reporting_interval_sec: + self._log_rates(t_now, min(t_now - self._last_filter_maintenance, 2 * self.rate_limiting_reporting_interval_sec)) self._filter_maintenance(t_now, 0.0) self._last_filter_maintenance = t_now From 1a61e664a4342cf67e932e8272d58e13526264fb Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Tue, 23 Mar 2021 15:35:57 -0400 Subject: [PATCH 07/14] Key error fix --- traptor/traptor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/traptor/traptor.py b/traptor/traptor.py index b5264d0..b86c311 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -1357,7 +1357,7 @@ def _is_filtered(self, enriched_data): # set of keys -> rule_values rule_values = set() - if 'collection_rules' in enriched_data['traptor']: + if 'traptor' in enriched_data and 'collection_rules' in enriched_data['traptor']: for rule in enriched_data['traptor']['collection_rules']: if 'value' in rule: rule_values.add(rule.get('value')) From 4775635290689f23f56d4a551366a911242469c7 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Tue, 23 Mar 2021 20:59:33 -0400 Subject: [PATCH 08/14] Fixes during testing --- traptor/traptor.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/traptor/traptor.py b/traptor/traptor.py index b86c311..81e1105 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -1292,8 +1292,16 @@ def _compute_rates(self, data, t_now, evaluation_window_sec): if evaluation_window_sec <= 0.0: return rates + t_start = t_now - evaluation_window_sec + for key, value in data.items(): - count = len(value) + + current_data = deque(value) + + while current_data and current_data[0] < t_start: + current_data.popleft() + + count = len(current_data) average_tps = float(count) / evaluation_window_sec max_tps = average_tps @@ -1305,8 +1313,8 @@ def _compute_rates(self, data, t_now, evaluation_window_sec): for i in range(int(math.ceil(evaluation_window_sec))): second_buckets[i] = 0 - for timestamp in value: - second = int(timestamp - value[0]) + for timestamp in current_data: + second = int(timestamp - current_data[0]) if second not in second_buckets: second_buckets[second] = 0 second_buckets[second] += 1 @@ -1445,7 +1453,7 @@ def _main_loop(self): if t_now > self._last_filter_maintenance + self.rate_limiting_reporting_interval_sec: self._log_rates(t_now, min(t_now - self._last_filter_maintenance, 2 * self.rate_limiting_reporting_interval_sec)) - self._filter_maintenance(t_now, 0.0) + self._filter_maintenance(t_now, self.rate_limiting_reporting_interval_sec) self._last_filter_maintenance = t_now if self.exit: From 32a65cee1be8bd9ceddc1c9b9a5e6964eb56c347 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Tue, 23 Mar 2021 21:28:58 -0400 Subject: [PATCH 09/14] Fixing min_tps by dropping extraneous bucket --- traptor/traptor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/traptor/traptor.py b/traptor/traptor.py index 81e1105..615b201 100644 --- a/traptor/traptor.py +++ b/traptor/traptor.py @@ -1310,7 +1310,7 @@ def _compute_rates(self, data, t_now, evaluation_window_sec): if count > 0: second_buckets = dict() - for i in range(int(math.ceil(evaluation_window_sec))): + for i in range(int(evaluation_window_sec)): second_buckets[i] = 0 for timestamp in current_data: From f8d68f9e95db63c013b99c48926d876cd83c5bdc Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Tue, 23 Mar 2021 21:43:32 -0400 Subject: [PATCH 10/14] Using version variable in test assert --- tests/test_traptor_offline.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index 8bf41ff..e482655 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -14,6 +14,7 @@ import mockredis from tenacity import wait_none +import version from traptor.birdy.twitter import TwitterApiError, TwitterAuthError from traptor.traptor import Traptor from traptor.traptor_birdy import TraptorBirdyClient @@ -647,24 +648,24 @@ def test_rate_logger(self, traptor): traptor.twitter_rate['test'] = deque([1, 2, 3]) traptor.logger = MagicMock() traptor._log_rates(4.0, 4.0) - assert traptor.logger.method_calls ==[call.info('Twitter Rate', extra={'count': 3, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 2, 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.75, 'traptor_version': '4.0.6.2'}), - call.info('Kafka Rate', extra={'count': 3, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 2, 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.75, 'traptor_version': '4.0.6.2'})] + assert traptor.logger.method_calls ==[call.info('Twitter Rate', extra={'count': 3, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.75, 'traptor_version': version.__version__}), + call.info('Kafka Rate', extra={'count': 3, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.75, 'traptor_version': version.__version__})] def test_rate_logger_out_of_range(self, traptor): traptor.kafka_rate['test'] = deque() traptor.twitter_rate['test'] = deque() traptor.logger = MagicMock() traptor._log_rates(4.0, 4.0) - assert traptor.logger.method_calls == [call.info('Twitter Rate', extra={'count': 0, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 0.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.0, 'traptor_version': '4.0.6.2'}), - call.info('Kafka Rate', extra={'count': 0, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 0.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.0, 'traptor_version': '4.0.6.2'})] + assert traptor.logger.method_calls == [call.info('Twitter Rate', extra={'count': 0, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 0.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.0, 'traptor_version': version.__version__}), + call.info('Kafka Rate', extra={'count': 0, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 0.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.0, 'traptor_version': version.__version__})] def test_rate_logger_length_one(self, traptor): traptor.kafka_rate['test'] = deque([2]) traptor.twitter_rate['test'] = deque([2]) traptor.logger = MagicMock() traptor._log_rates(4.0, 4.0) - assert traptor.logger.method_calls == [call.info('Twitter Rate', extra={'count': 1, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 0, 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.25, 'traptor_version': '4.0.6.2'}), - call.info('Kafka Rate', extra={'count': 1, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 0, 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.25, 'traptor_version': '4.0.6.2'})] + assert traptor.logger.method_calls == [call.info('Twitter Rate', extra={'count': 1, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.25, 'traptor_version': version.__version__}), + call.info('Kafka Rate', extra={'count': 1, 'tags': ['traptor_type:None', 'traptor_id:None'], 'rule_value': 'test', 'max_tps': 1.0, 'component': 'traptor', 'min_tps': 0.0, 'average_tps': 0.25, 'traptor_version': version.__version__})] def test_filter_maintenance(self, traptor): now = time.time() From 8a96314df09b9cafa666b6e1e83a7de425c63173 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Tue, 23 Mar 2021 21:45:07 -0400 Subject: [PATCH 11/14] Using version variable in test assert --- tests/test_traptor_offline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index e482655..1566b2c 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -14,7 +14,7 @@ import mockredis from tenacity import wait_none -import version +from traptor import version from traptor.birdy.twitter import TwitterApiError, TwitterAuthError from traptor.traptor import Traptor from traptor.traptor_birdy import TraptorBirdyClient From 11e74a07efeb6310bf45b7322fe262eb38532051 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Tue, 23 Mar 2021 21:51:06 -0400 Subject: [PATCH 12/14] Relaxed assert on timing dependent test --- tests/test_traptor_offline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_traptor_offline.py b/tests/test_traptor_offline.py index 1566b2c..d2d8b04 100644 --- a/tests/test_traptor_offline.py +++ b/tests/test_traptor_offline.py @@ -693,7 +693,7 @@ def test_is_filtered_one_rule_value(self, traptor): time.sleep(.01) assert len(traptor.twitter_rate['air force']) == 100 - assert len(traptor.kafka_rate['air force']) == 21 + assert 20 <= len(traptor.kafka_rate['air force']) <= 22 def test_is_filtered_dummy(self, traptor): enriched_data = { From 23420ec298ef73ba2d9a4f830caec42fb6d21ef7 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Wed, 24 Mar 2021 11:25:30 -0400 Subject: [PATCH 13/14] Cleaned up for testing --- docker-compose.yml | 70 ++++++++++++++++++++--------- logstash/logs-template.json | 38 +--------------- logstash/traptor-logstash.conf | 3 +- scripts/add_rules.py | 82 ++++++++++++++++++++++++++++++++++ 4 files changed, 135 insertions(+), 58 deletions(-) create mode 100644 scripts/add_rules.py diff --git a/docker-compose.yml b/docker-compose.yml index fa6cd73..14ef759 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,38 +3,40 @@ version: '2' services: traptor: - build: . + build: + context: . image: istresearch/traptor volumes: - - .:/code - - ./logs:/var/log/traptor + - logs:/var/log/traptor env_file: - ./traptor.env depends_on: - redis -# - kafka -# - elasticsearch -# - kopf -# - logstash -# - kibana + - kafka environment: - KAFKA_HOSTS=kafka:9092 + - KAFKA_TOPIC=traptor - REDIS_HOST=redis - REDIS_PORT=6379 - REDIS_DB=1 + - REDIS_PUBSUB_CHANNEL=traptor-notify - HEARTBEAT_INTERVAL=10 - - LOG_STDOUT=True + - RULE_CHECK_INTERVAL=5 + - LOG_STDOUT=False + - RATE_LIMITING_ENABLED=True + - RATE_LIMITING_RATE_SEC=2.0 + - RATE_LIMITING_CAPACITY=10 + - RATE_LIMITING_REPORTING_INTERVAL_SEC=10 - DW_ENABLED=False - DW_LOCAL=True # True for local dev - DW_STATSD_HOST=statsd # on a linux host set to 172.17.0.1 and on mac os host set to docker.for.mac.localhost - DW_STATSD_PORT=8125 # port statsd container is listening on - restart: always healthcheck: image: istresearch/traptor volumes: - .:/code - - ./logs:/var/log/traptor + - logs:/var/log/traptor env_file: - ./traptor.env depends_on: @@ -66,14 +68,15 @@ services: ports: - "9092:9092" environment: - KAFKA_ADVERTISED_HOST_NAME: kafka + #KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 # If trying to connect from outside docker, enable this + #KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 # If trying to connect from inside docker, enable this KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CREATE_TOPICS: "traptor:1:1" volumes: - /var/run/docker.sock:/var/run/docker.sock depends_on: - zookeeper - restart: always zookeeper: image: wurstmeister/zookeeper @@ -81,25 +84,49 @@ services: - "2181:2181" elasticsearch: - image: elasticsearch:5.0 - command: elasticsearch -E network.host=0.0.0.0 -E discovery.zen.minimum_master_nodes=1 + image: docker.elastic.co/elasticsearch/elasticsearch:7.12.0 + command: elasticsearch + -E network.host=0.0.0.0 + -E discovery.zen.minimum_master_nodes=1 + -E cluster.name="docker-cluster" + -E node.name="master-1" + -E node.master=true + -E node.data=true + -E node.ingest=false + -E cluster.initial_master_nodes="master-1" + -E bootstrap.memory_lock=true + -E xpack.security.enabled=false + -E xpack.monitoring.enabled=false + -E xpack.graph.enabled=false + -E xpack.watcher.enabled=false ports: - "9200:9200" + - "9300:9300" + environment: + ES_JAVA_OPTS: "-Xms256m -Xmx256m" + ulimits: + memlock: + soft: -1 + hard: -1 logstash: - image: logstash:2.4.0-1 + image: docker.elastic.co/logstash/logstash:7.12.0 command: logstash -f /etc/logstash/conf.d/logstash.conf volumes: - ./logstash/traptor-logstash.conf:/etc/logstash/conf.d/logstash.conf - ./logstash/logs-template.json:/etc/logstash/templates/logs-template.json - - ./logs:/var/log/traptor + - logs:/var/log/traptor kibana: - image: kibana:5.0 + image: docker.elastic.co/kibana/kibana:7.12.0 + environment: + - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 + - XPACK_SECURITY_ENABLED=False + - XPACK_MONITORING_ENABLED=False + - XPACK_GRAPH_ENABLED=False + - XPACK_REPORTING_ENABLED=False ports: - "5601:5601" - environment: - - ELASTICSEARCH_URL=http://elasticsearch:9200 depends_on: - elasticsearch @@ -147,3 +174,6 @@ services: - ./logs:/var/log/twitterapi restart: always command: python traptor/manager/run.py + +volumes: + logs: \ No newline at end of file diff --git a/logstash/logs-template.json b/logstash/logs-template.json index 6c7ad53..1e8064b 100644 --- a/logstash/logs-template.json +++ b/logstash/logs-template.json @@ -1,46 +1,12 @@ { - "template" : "logs-*", + "template" : "traptor-*", "order" : 0, "settings" : { "index.refresh_interval" : "5s", - "number_of_shards" : "5", + "number_of_shards" : "1", "number_of_replicas": "0" }, "mappings" : { - "_default_" : { - "dynamic_templates" : [ { - "message_field" : { - "mapping" : { - "index" : "not_analyzed", - "omit_norms" : true, - "include_in_parent": true, - "type" : "string" - }, - "match_mapping_type" : "string", - "match" : "message" - } - }, { - "string_fields" : { - "mapping" : { - "index" : "not_analyzed", - "omit_norms" : true, - "include_in_parent": true, - "type" : "string" - }, - "match_mapping_type" : "string", - "match" : "*" - } - } ], - "properties" : { - "@version" : { - "index" : "not_analyzed", - "type" : "string" - } - }, - "_all" : { - "enabled" : true - } - } }, "aliases" : { } } \ No newline at end of file diff --git a/logstash/traptor-logstash.conf b/logstash/traptor-logstash.conf index e65fef9..dccde44 100644 --- a/logstash/traptor-logstash.conf +++ b/logstash/traptor-logstash.conf @@ -13,8 +13,7 @@ output { template => "/etc/logstash/templates/logs-template.json" template_name => "logs-*" template_overwrite => true - index => "logs-traptor-%{+YYYY.MM.dd}" - document_type => "%{[logger]}" + index => "traptor-%{+YYYY.MM.dd}" } } } \ No newline at end of file diff --git a/scripts/add_rules.py b/scripts/add_rules.py new file mode 100644 index 0000000..e5bfc00 --- /dev/null +++ b/scripts/add_rules.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +# encoding: utf-8 +import redis + +location_rules = [{ + "status":"active", + "rule_type":"locations", + "orig_type":"geo", + "description":"Tweets from CONUS", + "project_version_id":"None", + "campaign_id":"default", + "value":"-125.551758,24.726875,-66.401367,49.439557", + "tag":"Pulse.united_states", + "appid":"ist-dev", + "date_added":"2021-01-19 18:37:28", + "project_id":"default", + "rule_id":"1", + "node_id":"None" +}] + +track_rules = [ + { + "status":"active", + "rule_type":"track", + "orig_type":"keyword", + "description":"test", + "project_version_id":"None", + "campaign_id":"default", + "value":"usa", + "tag":"Pulse.united_states", + "appid":"ist-dev", + "date_added":"2021-01-19 18:37:28", + "project_id":"default", + "rule_id":"10", + "node_id":"None" + }, + { + "status":"active", + "rule_type":"track", + "orig_type":"keyword", + "description":"news", + "project_version_id":"None", + "campaign_id":"default", + "value":"news", + "tag":"noise", + "appid":"ist-dev", + "date_added":"2021-01-19 18:37:28", + "project_id":"default", + "rule_id":"11", + "node_id":"None" + } +] + +follow_rules = [{ + "status":"active", + "rule_type":"follow", + "orig_type":"username", + "description":"news", + "project_version_id":"None", + "campaign_id":"default", + "value":"cnn", + "tag":"news", + "appid":"ist-dev", + "date_added":"2021-01-19 18:37:28", + "project_id":"default", + "rule_id":"20", + "node_id":"None" +}] + +r = redis.Redis('localhost', db=1) +r.info() + +traptor_id = 0 + +for item in location_rules: + r.hmset('traptor-locations:{}:{}'.format(traptor_id, item['rule_id']), item) + +for item in track_rules: + r.hmset('traptor-track:{}:{}'.format(traptor_id, item['rule_id']), item) + +for item in follow_rules: + r.hmset('traptor-follow:{}:{}'.format(traptor_id, item['rule_id']), item) From 4835b329c731ca074f6df551e3ce2a5a4aba2804 Mon Sep 17 00:00:00 2001 From: Andrew Carter Date: Wed, 24 Mar 2021 12:24:56 -0400 Subject: [PATCH 14/14] Added mode --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index 14ef759..5a6e7e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ services: - redis - kafka environment: + - TRAPTOR_TYPE=track - KAFKA_HOSTS=kafka:9092 - KAFKA_TOPIC=traptor - REDIS_HOST=redis