-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add partitioned support to Sequence and Table sources #8
base: master
Are you sure you want to change the base?
Changes from 5 commits
34f7454
77b6d14
613c781
89553a6
108b0f5
8663472
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,4 @@ | ||
from ._version import get_versions | ||
__version__ = get_versions()['version'] | ||
del get_versions | ||
|
||
import intake # Import this first to avoid circular imports during discovery. | ||
del intake | ||
from .source import SOLRSequenceSource, SOLRTableSource |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
import math | ||
|
||
from intake.source import base | ||
import pandas as pd | ||
import pysolr | ||
|
@@ -28,18 +30,25 @@ class SOLRSequenceSource(base.DataSource): | |
zoocollection: bool or str | ||
If using Zookeeper to orchestrate SOLR, this is the name of the | ||
collection to connect to. | ||
partition_len: int or None | ||
The desired partition size. [default: 1024] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We add a new parameter that limits the number of rows that are returned per partition. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked at intake-es and it seems they use the ❓ Let me know if you prefer that option. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have a strong preference, but consistency might be good |
||
""" | ||
container = 'python' | ||
name = 'solr' | ||
version = __version__ | ||
partition_access = False | ||
partition_access = True | ||
|
||
def __init__(self, query, base_url, core, qargs=None, metadata=None, | ||
auth=None, cert=None, zoocollection=False): | ||
auth=None, cert=None, zoocollection=False, | ||
partition_len=1024): | ||
self.query = query | ||
self.qargs = qargs or {} | ||
self.metadata = metadata or {} | ||
self._schema = None | ||
self.partition_len = partition_len | ||
|
||
if partition_len and partition_len <= 0: | ||
raise ValueError(f"partition_len must be None or positive, got {partition_len}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When We should verify that the old behavior was actually working. On my system, if we don't set the number of rows to return, then we only get back the first ten records of the dataset. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A test would be good. Does setting partition_len -> +inf (very large number) work for one-partition output? |
||
if auth == 'kerberos': | ||
from requests_kerberos import HTTPKerberosAuth, OPTIONAL | ||
auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, | ||
|
@@ -64,24 +73,57 @@ def __init__(self, query, base_url, core, qargs=None, metadata=None, | |
super(SOLRSequenceSource, self).__init__(metadata=metadata) | ||
|
||
def _get_schema(self): | ||
return base.Schema(datashape=None, | ||
dtype=None, | ||
shape=None, | ||
npartitions=1, | ||
extra_metadata={}) | ||
"""Do a 0 row query and get the number of hits from the response""" | ||
qargs = self.qargs.copy() | ||
qargs["rows"] = 0 | ||
start = qargs.get("start", 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The user may want to start the query at a different position, so we take that into account. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📜 If we add support for Cursors, then we can't use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know what people would normally use. |
||
results = self.solr.search(self.query, **qargs) | ||
|
||
if self.partition_len is None: | ||
npartitions = 1 | ||
else: | ||
npartitions = math.ceil((results.hits - start) / self.partition_len) | ||
|
||
return base.Schema( | ||
datashape=None, | ||
dtype=None, | ||
shape=(results.hits - start,), | ||
Comment on lines
+88
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❓ What is the difference between datashape and shape? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. datashape isn't used; it was meant for forward compatibility with complex types (struct, nested list) |
||
npartitions=npartitions, | ||
extra_metadata={}, | ||
) | ||
|
||
def _do_query(self): | ||
def _do_query(self, index): | ||
qargs = self.qargs.copy() | ||
if self.partition_len is not None: | ||
qargs["start"] = qargs.get("start", 0) + index * self.partition_len | ||
qargs["rows"] = self.partition_len | ||
return self.solr.search(self.query, **qargs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets return the raw results of the query at this point. There are other valuable fields, like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. |
||
|
||
def _get_partition(self, index): | ||
"""Downloads all data in query response""" | ||
solr_rv = self._do_query(index) | ||
out = [] | ||
data = self.solr.search(self.query, **self.qargs).docs | ||
for d in data: | ||
for d in solr_rv.docs: | ||
out.append({k: (v[0] if isinstance(v, (tuple, list)) else v) | ||
for k, v in d.items()}) | ||
return out | ||
|
||
def _get_partition(self, _): | ||
"""Downloads all data | ||
""" | ||
return self._do_query() | ||
def _close(self): | ||
pass | ||
|
||
def read(self): | ||
self._load_metadata() | ||
from itertools import chain | ||
return chain(*(self._get_partition(index) for index in range(self.npartitions))) | ||
|
||
def to_dask(self): | ||
from dask import delayed | ||
import dask.bag | ||
|
||
self._load_metadata() | ||
return dask.bag.from_delayed( | ||
[delayed(self.read_partition)(i) for i in range(self.npartitions)] | ||
) | ||
|
||
|
||
class SOLRTableSource(SOLRSequenceSource): | ||
|
@@ -108,32 +150,42 @@ class SOLRTableSource(SOLRSequenceSource): | |
zoocollection: bool or str | ||
If using Zookeeper to orchestrate SOLR, this is the name of the | ||
collection to connect to. | ||
partition_len: int or None | ||
The desired partition size. [default: 1024] | ||
""" | ||
|
||
name = 'solrtab' | ||
container = 'dataframe' | ||
partition_access = True | ||
|
||
def _get_schema(self, retry=2): | ||
"""Get schema from first 10 hits or cached dataframe""" | ||
if not hasattr(self, '_dataframe'): | ||
self._get_partition(0) | ||
dtype = {k: str(v) | ||
for k, v in self._dataframe.dtypes.to_dict().items()} | ||
return base.Schema(datashape=None, | ||
dtype=dtype, | ||
shape=self._dataframe.shape, | ||
npartitions=1, | ||
extra_metadata={}) | ||
|
||
def _get_partition(self, _): | ||
"""Downloads all data | ||
schema = super()._get_schema() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We get the Schema from SOLRSequenceSource. This contains the number of partitions and the total number of records, but not the dtype. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it not worth grabbing the result head to figure this out (on request)? |
||
|
||
df = self._get_partition(0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This loads the first partition into a dataframe and uses it to discover the returned schema. |
||
schema["dtype"] = {k: str(v) | ||
for k, v in df.dtypes.to_dict().items()} | ||
schema["shape"] = (schema["shape"][0], *df.shape[1:]) | ||
return schema | ||
|
||
def _get_partition(self, index): | ||
"""Downloads all data in the partition | ||
""" | ||
if not hasattr(self, '_dataframe'): | ||
df = pd.DataFrame(self._do_query()) | ||
self._dataframe = df | ||
self._schema = None | ||
self.discover() | ||
return self._dataframe | ||
seq = super()._get_partition(index) | ||
# Columns are sorted unless the user defines the field list (fl) | ||
columns = self.qargs["fl"] if "fl" in self.qargs else sorted(seq[0].keys()) | ||
return pd.DataFrame(seq, columns=columns) | ||
|
||
def read(self): | ||
self._load_metadata() | ||
return pd.concat(self._get_partition(index) for index in range(self.npartitions)) | ||
|
||
def to_dask(self): | ||
from dask import delayed | ||
import dask.dataframe | ||
|
||
self._load_metadata() | ||
return dask.dataframe.from_delayed( | ||
[delayed(self.read_partition)(i) for i in range(self.npartitions)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is also |
||
) | ||
|
||
def _close(self): | ||
self._dataframe = None |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ | |
from .util import start_solr, stop_docker, TEST_CORE | ||
|
||
CONNECT = {'host': 'localhost', 'port': 9200} | ||
TEST_DATA_DIR = 'tests' | ||
TEST_DATA_DIR = os.path.abspath(os.path.dirname(__file__)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was having issues running the test from within PyCharm, this fixed the problem for me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems good practice - should not require a particular CWD |
||
TEST_DATA = 'sample1.csv' | ||
df = pd.read_csv(os.path.join(TEST_DATA_DIR, TEST_DATA)) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got exceptions when running with dask distributed. Removing these lines fixed the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the move to entrypoints to declare the drivers, I hope this is no longer needed