-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add partitioned support to Sequence and Table sources #8
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martindurant, let me know if you think this approach works for handling partitions in intake-solr.
@@ -28,18 +30,25 @@ class SOLRSequenceSource(base.DataSource): | |||
zoocollection: bool or str | |||
If using Zookeeper to orchestrate SOLR, this is the name of the | |||
collection to connect to. | |||
partition_len: int or None | |||
The desired partition size. [default: 1024] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We add a new parameter that limits the number of rows that are returned per partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at intake-es and it seems they use the npartitions
as an input instead of partition_len
.
❓ Let me know if you prefer that option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong preference, but consistency might be good
self.partition_len = partition_len | ||
|
||
if partition_len and partition_len <= 0: | ||
raise ValueError(f"partition_len must be None or positive, got {partition_len}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When partition_len
is None, we get the old behavior of not setting the row count.
We should verify that the old behavior was actually working. On my system, if we don't set the number of rows to return, then we only get back the first ten records of the dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A test would be good. Does setting partition_len -> +inf (very large number) work for one-partition output?
"""Do a 0 row query and get the number of hits from the response""" | ||
qargs = self.qargs.copy() | ||
qargs["rows"] = 0 | ||
start = qargs.get("start", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user may want to start the query at a different position, so we take that into account.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📜 If we add support for Cursors, then we can't use the start
option, according to SOLR documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what people would normally use.
Does offsetting with start cause the server to scan the whole table, or is solr smart here?
datashape=None, | ||
dtype=None, | ||
shape=(results.hits - start,), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ What is the difference between datashape and shape?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datashape isn't used; it was meant for forward compatibility with complex types (struct, nested list)
if self.partition_len is not None: | ||
qargs["start"] = qargs.get("start", 0) + index * self.partition_len | ||
qargs["rows"] = self.partition_len | ||
return self.solr.search(self.query, **qargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets return the raw results of the query at this point. There are other valuable fields, like facets
, that we can be used in sub-classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
Are facets another useful way to partition? Are there shards too?
|
||
def _get_partition(self, _): | ||
"""Downloads all data | ||
schema = super()._get_schema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get the Schema from SOLRSequenceSource. This contains the number of partitions and the total number of records, but not the dtype.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not worth grabbing the result head to figure this out (on request)?
intake_solr/source.py
Outdated
"""Downloads all data | ||
schema = super()._get_schema() | ||
|
||
df = self._get_partition(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loads the first partition into a dataframe and uses it to discover the returned schema.
Note that the schema might be different from the overall SOLR core schema because the user can select a subset of fields using the fl
qarg
.
@@ -9,7 +9,7 @@ | |||
from .util import start_solr, stop_docker, TEST_CORE | |||
|
|||
CONNECT = {'host': 'localhost', 'port': 9200} | |||
TEST_DATA_DIR = 'tests' | |||
TEST_DATA_DIR = os.path.abspath(os.path.dirname(__file__)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was having issues running the test from within PyCharm, this fixed the problem for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems good practice - should not require a particular CWD
@@ -1,7 +1,4 @@ | |||
from ._version import get_versions | |||
__version__ = get_versions()['version'] | |||
del get_versions | |||
|
|||
import intake # Import this first to avoid circular imports during discovery. | |||
del intake |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got exceptions when running with dask distributed. Removing these lines fixed the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the move to entrypoints to declare the drivers, I hope this is no longer needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking pretty good.
I think some tests will clarify usage and correctness.
The current code clearly demonstrates my ignorance of how SOLR really works - thank you for this!
@@ -1,7 +1,4 @@ | |||
from ._version import get_versions | |||
__version__ = get_versions()['version'] | |||
del get_versions | |||
|
|||
import intake # Import this first to avoid circular imports during discovery. | |||
del intake |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the move to entrypoints to declare the drivers, I hope this is no longer needed
self.partition_len = partition_len | ||
|
||
if partition_len and partition_len <= 0: | ||
raise ValueError(f"partition_len must be None or positive, got {partition_len}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A test would be good. Does setting partition_len -> +inf (very large number) work for one-partition output?
"""Do a 0 row query and get the number of hits from the response""" | ||
qargs = self.qargs.copy() | ||
qargs["rows"] = 0 | ||
start = qargs.get("start", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what people would normally use.
Does offsetting with start cause the server to scan the whole table, or is solr smart here?
datashape=None, | ||
dtype=None, | ||
shape=(results.hits - start,), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datashape isn't used; it was meant for forward compatibility with complex types (struct, nested list)
if self.partition_len is not None: | ||
qargs["start"] = qargs.get("start", 0) + index * self.partition_len | ||
qargs["rows"] = self.partition_len | ||
return self.solr.search(self.query, **qargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
Are facets another useful way to partition? Are there shards too?
|
||
def _get_partition(self, _): | ||
"""Downloads all data | ||
schema = super()._get_schema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not worth grabbing the result head to figure this out (on request)?
|
||
self._load_metadata() | ||
return dask.dataframe.from_delayed( | ||
[delayed(self.read_partition)(i) for i in range(self.npartitions)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also bag.to_dataframe
, which may be less code, reuse the sequence partitions
@@ -9,7 +9,7 @@ | |||
from .util import start_solr, stop_docker, TEST_CORE | |||
|
|||
CONNECT = {'host': 'localhost', 'port': 9200} | |||
TEST_DATA_DIR = 'tests' | |||
TEST_DATA_DIR = os.path.abspath(os.path.dirname(__file__)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems good practice - should not require a particular CWD
Hi @martindurant, I was pulled into a different task at work, and I won't be able to come back to this task for about a month. If it is okay, I would like to keep this still open as a draft. |
No problem - ping me when you want me to have a look. |
This PR tries to add support for Partitioned access to a Solr collection. By default, we define the partition_len to be 1024 records. During metadata lookup we can get a hit on the number of records in the solr collection. The number of partitions becomes
ceil(numRecords/partition_len)
📜 Note:
to_dask
;The problem is that the cursors can only be obtained by iterating one page at a time(can't be paralleized). Fortunately, we only need the document id's and not the entire SOLR response, so the network transfer cost is small.