Skip to content

Commit

Permalink
Corrected error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
dstore-dbap committed Nov 30, 2015
1 parent 61ad513 commit 4ef6a42
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions gambolputty/input/ElasticSearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import types
import requests
import urllib3
from elasticsearch import Elasticsearch, connection
import BaseThreadedModule
import Utils
Expand Down Expand Up @@ -108,16 +109,32 @@ def configure(self, configuration):
# Set requests log level.
logging.getLogger('requests').setLevel(logging.WARNING)
self.can_run_forked = True
self.shared_scroll_id = self.manager.Value(c_char_p, self.getInitalialScrollId())
scroll_id = self.getInitalialScrollId()
if not scroll_id:
self.gp.shutDown()
self.shared_scroll_id = self.manager.Value(c_char_p, scroll_id)
elif self.search_type == 'normal':
self.query_from = 0
self.query = json.loads(self.query)
self.query['size'] = self.batch_size

def getInitalialScrollId(self):
response = requests.get('http://%s/%s/_search?search_type=scan&scroll=1m&size=%s' % (self.es_nodes[0], self.index_name, self.batch_size), data=self.query)
results = json.loads(response.text)
return results['_scroll_id']
scroll_id = None
try:
response = requests.get('http://%s/%s/_search?search_type=scan&scroll=1m&size=%s' % (self.es_nodes[0], self.index_name, self.batch_size), data=self.query)
except:
etype, evalue, etb = sys.exc_info()
self.logger.error("Could not initialize scan search. Exception: %s, Error: %s." % (etype, evalue))
try:
results = json.loads(response.text)
except:
etype, evalue, etb = sys.exc_info()
self.logger.error("Could not parse query response. Exception: %s, Error: %s." % (etype, evalue))
if '_scroll_id' in results:
scroll_id = results['_scroll_id']
else:
self.logger.error("Could not get initial scroll id. Response: %s." % results)
return scroll_id

def initAfterFork(self):
BaseThreadedModule.BaseThreadedModule.initAfterFork(self)
Expand Down

0 comments on commit 4ef6a42

Please sign in to comment.