From 34f7454aea02e0b776ad1207ab81557221fcc10b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Sodr=C3=A9?= Date: Thu, 16 Jul 2020 22:13:10 -0400 Subject: [PATCH 1/6] Add initial support for partitioned sequence --- intake_solr/source.py | 63 ++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/intake_solr/source.py b/intake_solr/source.py index 60eea0a..095d220 100644 --- a/intake_solr/source.py +++ b/intake_solr/source.py @@ -1,3 +1,5 @@ +import math + from intake.source import base import pandas as pd import pysolr @@ -28,18 +30,24 @@ class SOLRSequenceSource(base.DataSource): zoocollection: bool or str If using Zookeeper to orchestrate SOLR, this is the name of the collection to connect to. + partition_len: int or None + The desired partition size. [default: 1024] """ container = 'python' name = 'solr' version = __version__ - partition_access = False def __init__(self, query, base_url, core, qargs=None, metadata=None, - auth=None, cert=None, zoocollection=False): + auth=None, cert=None, zoocollection=False, + partition_len=1024): self.query = query self.qargs = qargs or {} self.metadata = metadata or {} self._schema = None + self.partition_len = partition_len + + if partition_len and partition_len <= 0: + raise ValueError(f"partition_len must be None or positive, got {partition_len}") if auth == 'kerberos': from requests_kerberos import HTTPKerberosAuth, OPTIONAL auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, @@ -64,24 +72,49 @@ def __init__(self, query, base_url, core, qargs=None, metadata=None, super(SOLRSequenceSource, self).__init__(metadata=metadata) def _get_schema(self): - return base.Schema(datashape=None, - dtype=None, - shape=None, - npartitions=1, - extra_metadata={}) + """Do a 0 row query and get the number of hits from the response""" + qargs = self.qargs.copy() + qargs["rows"] = 0 + start = qargs.get("start", 0) + results = self.solr.search(self.query, **qargs) + + if self.partition_len is None: + npartitions = 1 + else: + npartitions = math.ceil((results.hits - start) / self.partition_len) - def _do_query(self): + return base.Schema( + datashape=None, + dtype=None, + shape=(results.hits - start,), + npartitions=npartitions, + extra_metadata={}, + ) + + def _do_query(self, index): + qargs = self.qargs.copy() + if self.partition_len is not None: + qargs["start"] = qargs.get("start", 0) + index * self.partition_len + qargs["rows"] = self.partition_len + return self.solr.search(self.query, **qargs) + + def _get_partition(self, index): + """Downloads all data in query response""" + solr_rv = self._do_query(index) out = [] - data = self.solr.search(self.query, **self.qargs).docs - for d in data: + for d in solr_rv.docs: out.append({k: (v[0] if isinstance(v, (tuple, list)) else v) for k, v in d.items()}) return out - def _get_partition(self, _): - """Downloads all data - """ - return self._do_query() + def to_dask(self): + from dask import delayed + import dask.bag + + npartitions = self.discover()["npartitions"] + return dask.bag.from_delayed( + [delayed(self.read_partition)(i) for i in range(npartitions)] + ) class SOLRTableSource(SOLRSequenceSource): @@ -129,7 +162,7 @@ def _get_partition(self, _): """Downloads all data """ if not hasattr(self, '_dataframe'): - df = pd.DataFrame(self._do_query()) + df = pd.DataFrame(super()._get_partition(_)) self._dataframe = df self._schema = None self.discover() From 77b6d141bdbb2d752278bc05d9e09f72d0f36b0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Sodr=C3=A9?= Date: Thu, 16 Jul 2020 23:23:32 -0400 Subject: [PATCH 2/6] Add partitioning for SOLRTable --- intake_solr/__init__.py | 3 --- intake_solr/source.py | 53 +++++++++++++++++++++++------------------ 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/intake_solr/__init__.py b/intake_solr/__init__.py index b49f9f9..7cb4e8f 100644 --- a/intake_solr/__init__.py +++ b/intake_solr/__init__.py @@ -1,7 +1,4 @@ from ._version import get_versions __version__ = get_versions()['version'] del get_versions - -import intake # Import this first to avoid circular imports during discovery. -del intake from .source import SOLRSequenceSource, SOLRTableSource diff --git a/intake_solr/source.py b/intake_solr/source.py index 095d220..4d4785d 100644 --- a/intake_solr/source.py +++ b/intake_solr/source.py @@ -36,6 +36,7 @@ class SOLRSequenceSource(base.DataSource): container = 'python' name = 'solr' version = __version__ + partition_access = True def __init__(self, query, base_url, core, qargs=None, metadata=None, auth=None, cert=None, zoocollection=False, @@ -111,9 +112,9 @@ def to_dask(self): from dask import delayed import dask.bag - npartitions = self.discover()["npartitions"] + self._load_metadata() return dask.bag.from_delayed( - [delayed(self.read_partition)(i) for i in range(npartitions)] + [delayed(self.read_partition)(i) for i in range(self.npartitions)] ) @@ -141,32 +142,38 @@ class SOLRTableSource(SOLRSequenceSource): zoocollection: bool or str If using Zookeeper to orchestrate SOLR, this is the name of the collection to connect to. + partition_len: int or None + The desired partition size. [default: 1024] """ name = 'solrtab' container = 'dataframe' + partition_access = True def _get_schema(self, retry=2): """Get schema from first 10 hits or cached dataframe""" - if not hasattr(self, '_dataframe'): - self._get_partition(0) - dtype = {k: str(v) - for k, v in self._dataframe.dtypes.to_dict().items()} - return base.Schema(datashape=None, - dtype=dtype, - shape=self._dataframe.shape, - npartitions=1, - extra_metadata={}) - - def _get_partition(self, _): - """Downloads all data + schema = super()._get_schema() + + df = self._get_partition(0) + schema["dtype"] = {k: str(v) + for k, v in df.dtypes.to_dict().items()} + schema["shape"] = (schema["shape"][0], *df.shape[1:]) + return schema + + def _get_partition(self, index): + """Downloads all data in the partition """ - if not hasattr(self, '_dataframe'): - df = pd.DataFrame(super()._get_partition(_)) - self._dataframe = df - self._schema = None - self.discover() - return self._dataframe - - def _close(self): - self._dataframe = None + seq = super()._get_partition(index) + # Columns are sorted unless the user defines the field list (fl) + columns = self.qargs["fl"] if "fl" in self.qargs else sorted(seq[0].keys()) + return pd.DataFrame(seq, columns=columns) + + def to_dask(self): + from dask import delayed + import dask.dataframe + + self._load_metadata() + return dask.dataframe.from_delayed( + [delayed(self.read_partition)(i) for i in range(self.npartitions)] + ) + From 613c781ea3d4396381bd805048f8f71dd2a2cc64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Sodr=C3=A9?= Date: Fri, 17 Jul 2020 00:18:10 -0400 Subject: [PATCH 3/6] Implement SequenceSource.read --- intake_solr/source.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/intake_solr/source.py b/intake_solr/source.py index 4d4785d..0b2de92 100644 --- a/intake_solr/source.py +++ b/intake_solr/source.py @@ -108,6 +108,14 @@ def _get_partition(self, index): for k, v in d.items()}) return out + def _close(self): + pass + + def read(self): + self._load_metadata() + from itertools import chain + return chain(*(self._get_partition(index) for index in range(self.npartitions))) + def to_dask(self): from dask import delayed import dask.bag From 89553a659650de3817ebd68393062b8bd6e1ba80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Sodr=C3=A9?= Date: Fri, 17 Jul 2020 00:23:45 -0400 Subject: [PATCH 4/6] Implement TableSource.read --- intake_solr/source.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/intake_solr/source.py b/intake_solr/source.py index 0b2de92..8e5c1b1 100644 --- a/intake_solr/source.py +++ b/intake_solr/source.py @@ -176,6 +176,10 @@ def _get_partition(self, index): columns = self.qargs["fl"] if "fl" in self.qargs else sorted(seq[0].keys()) return pd.DataFrame(seq, columns=columns) + def read(self): + self._load_metadata() + return pd.concat(self._get_partition(index) for index in range(self.npartitions)) + def to_dask(self): from dask import delayed import dask.dataframe From 108b0f52d699fec8232c98759cec7ea28940d4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Sodr=C3=A9?= Date: Fri, 17 Jul 2020 00:24:28 -0400 Subject: [PATCH 5/6] Don't hard-code TEST_DATA_DIR --- tests/test_intake_solr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_intake_solr.py b/tests/test_intake_solr.py index d969036..8a0bc13 100644 --- a/tests/test_intake_solr.py +++ b/tests/test_intake_solr.py @@ -9,7 +9,7 @@ from .util import start_solr, stop_docker, TEST_CORE CONNECT = {'host': 'localhost', 'port': 9200} -TEST_DATA_DIR = 'tests' +TEST_DATA_DIR = os.path.abspath(os.path.dirname(__file__)) TEST_DATA = 'sample1.csv' df = pd.read_csv(os.path.join(TEST_DATA_DIR, TEST_DATA)) From 86634729c3a5956cd00c81d4dcc735442cc19288 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Sodr=C3=A9?= Date: Fri, 17 Jul 2020 00:50:18 -0400 Subject: [PATCH 6/6] Only read the first 10 records to discover the schema. --- intake_solr/source.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/intake_solr/source.py b/intake_solr/source.py index 8e5c1b1..d63ee2b 100644 --- a/intake_solr/source.py +++ b/intake_solr/source.py @@ -162,10 +162,15 @@ def _get_schema(self, retry=2): """Get schema from first 10 hits or cached dataframe""" schema = super()._get_schema() - df = self._get_partition(0) - schema["dtype"] = {k: str(v) - for k, v in df.dtypes.to_dict().items()} - schema["shape"] = (schema["shape"][0], *df.shape[1:]) + prev_partition_len = self.partition_len + try: + self.partition_len = 10 + df = self._get_partition(0) + schema["dtype"] = {k: str(v) for k, v in df.dtypes.to_dict().items()} + schema["shape"] = (schema["shape"][0], *df.shape[1:]) + finally: + self.partition_len = prev_partition_len + return schema def _get_partition(self, index):