Skip to content

Commit

Permalink
Merge "config/cassandra: use eventloop to bound queries"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins CI authored and opencontrail-ci-admin committed Sep 4, 2020
2 parents 9a4c944 + ff275d8 commit 65369d3
Showing 1 changed file with 37 additions and 18 deletions.
55 changes: 37 additions & 18 deletions src/config/common/cfgm_common/datastore/drivers/cassandra_cql.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ def ensure_keyspace_replication(self, keyspace,
msg.format(keyspace, props), level=SandeshLevel.SYS_NOTICE)

def _cql_select(self, cf_name, key, start='', finish='', limit=None,
columns=None, include_timestamp=False, decode_json=None):
columns=None, include_timestamp=False, decode_json=None,
use_async=False):
ses = self.get_cf(cf_name)
arg, cql = [StringType(key)], """
SELECT blobAsText(column1), value, WRITETIME(value)
Expand All @@ -535,17 +536,29 @@ def _cql_select(self, cf_name, key, start='', finish='', limit=None,
# automatically.
decode_json = ses.keyspace.endswith(
datastore_api.UUID_KEYSPACE_NAME)
return Iter(ses.execute(pre.bind(arg)),
# Filtering the columns using cassandra adds
# performance degradation, letting Python
# processes doing that job locally, see: ALLOW
# FILTERING.
columns=columns,
include_timestamp=include_timestamp,
decode_json=decode_json,
logger=self.options.logger,
key=key,
cf_name=cf_name)

def iterize(r):
return Iter(r,
# Filtering the columns using cassandra adds
# performance degradation, letting Python
# processes doing that job locally, see: ALLOW
# FILTERING.
columns=columns,
include_timestamp=include_timestamp,
decode_json=decode_json,
logger=self.options.logger,
key=key,
cf_name=cf_name)
if use_async:
future = ses.execute_async(pre.bind(arg))
# When using 'use_async=True', the result will be obtened when
# executing the function.
# f = drv.multiget(keys, ..., use_async=True)
# ...
# result = f()
return lambda: iterize(future.result())
else:
return iterize(ses.execute(pre.bind(arg)))

def _Get_CF_Batch(self, cf_name, keyspace_name=None):
return self.BatchClass(context=self, cf_name=cf_name)
Expand All @@ -557,13 +570,19 @@ def _Multiget(self, cf_name, keys, columns=None, start='', finish='',
except (ValueError, TypeError):
num_columns = MAX_COLUMNS
res = {}
# TODO(sahid): An importante optimisation is to execute the
# requests in parallel. see: future/asyncio.

futures = []
for key in keys:
row = self._cql_select(
cf_name, key=key, start=start, finish=finish,
columns=columns, include_timestamp=timestamp,
limit=num_columns).all()
# non blocking process of executing queries...
futures.append(
(key,
self._cql_select(
cf_name, key=key, start=start, finish=finish,
columns=columns, include_timestamp=timestamp,
limit=num_columns, use_async=True)))
for key, future in futures:
# Retrieving result
row = future().all()
if row:
# We should have used a generator but legacy does not
# handle it.
Expand Down

0 comments on commit 65369d3

Please sign in to comment.