Skip to content

Commit

Permalink
Implement Pagination with Start Key for CouchDB Views (#95)
Browse files Browse the repository at this point in the history
* add pagination to query
* update tests
  • Loading branch information
Phoenix009 authored May 29, 2024
1 parent b8506ae commit 19aad3e
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 8 deletions.
15 changes: 13 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 64 additions & 5 deletions pycouchdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from . import exceptions as exp
from .resource import Resource


DEFAULT_BASE_URL = os.environ.get('COUCHDB_URL', 'http://localhost:5984/')


Expand Down Expand Up @@ -55,6 +54,7 @@ class _StreamResponse(object):
See more on:
http://docs.python-requests.org/en/latest/user/advanced/#streaming-requests
"""

def __init__(self, response):
self._response = response

Expand Down Expand Up @@ -658,7 +658,7 @@ def one(self, name, flat=None, wrapper=None, **kwargs):

params = utils.encode_view_options(params)
result = list(self._query(self.resource(*path), wrapper=wrapper,
flat=flat, params=params, data=data))
flat=flat, params=params, data=data))

return result[0] if len(result) > 0 else None

Expand All @@ -680,7 +680,57 @@ def _query(self, resource, data=None, params=None, headers=None,
for row in result["rows"]:
yield wrapper(row)

def query(self, name, wrapper=None, flat=None, as_list=False, **kwargs):
def _query_paginate(self, resource, pagesize, data=None, params=None, headers=None,
flat=None, wrapper=None):
if wrapper is None:
wrapper = lambda row: row

if flat is not None:
wrapper = lambda row: row[flat]

limit = params.get('limit', float('inf'))
params['limit'] = pagesize + 1

if data is None:
(resp, result) = resource.get(params=params, headers=headers)
else:
(resp, result) = resource.post(
data=data, params=params, headers=headers)

startkey = result["rows"][0]["key"]

while len(result["rows"]) == pagesize + 1:

next_startkey = result["rows"][-1]["key"]
next_startkey_docid = result["rows"][-1]["id"]

for row in result["rows"][:-1]:
if limit <= 0: return

yield wrapper(row)
limit -= 1

# do this regardless?
# if startkey == next_startkey:
params['startkey_docid'] = next_startkey_docid

startkey = next_startkey
params['startkey'] = startkey

params = utils.encode_view_options(params)

if data is None:
(resp, result) = resource.get(params=params, headers=headers)
else:
(resp, result) = resource.post(
data=data, params=params, headers=headers)

for row in result["rows"]:
if limit <= 0: return
yield wrapper(row)
limit -= 1

def query(self, name, wrapper=None, flat=None, pagesize=None, as_list=False, **kwargs):
"""
Execute a design document view query.
Expand All @@ -690,6 +740,7 @@ def query(self, name, wrapper=None, flat=None, as_list=False, **kwargs):
default lazy generator.
:param flat: get a specific field from a object instead
of a complete object.
:param pagesize: Paginate the query response with `pagesize` rows per page.
.. versionadded: 1.4
Add as_list parameter.
Expand All @@ -708,8 +759,16 @@ def query(self, name, wrapper=None, flat=None, as_list=False, **kwargs):
data = utils.force_bytes(json.dumps(data))

params = utils.encode_view_options(params)
result = self._query(self.resource(*path), wrapper=wrapper,
flat=flat, params=params, data=data)

if pagesize is None:
result = self._query(self.resource(*path), wrapper=wrapper,
flat=flat, params=params, data=data)
else:
assert isinstance(pagesize, int), "pagesize should be a positive integer"
assert pagesize > 0, "pagesize should be a positive integer"

result = self._query_paginate(self.resource(*path), pagesize=pagesize, wrapper=wrapper,
flat=flat, params=params, data=data)

if as_list:
return list(result)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ classifiers = [
[tool.poetry.dependencies]
python = ">=3.8.1,<4"
requests = "^2.28"
chardet = "^5.2.0"

[tool.poetry.dev-dependencies]
responses = "^0.22"
Expand Down
109 changes: 108 additions & 1 deletion test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ def test_compact_view_02(db):


def test_attachments_01(db, rec_with_attachment):

doc = db.get("kk1")
assert "_attachments" in doc

Expand Down Expand Up @@ -462,3 +461,111 @@ def test_regression_unexpected_deletion_of_attachment(db, rec_with_attachment):

assert "_attachments" in doc
assert "sample.txt" in doc["_attachments"]


@pytest.fixture
def view(db):
querydoc = {
"_id": "_design/testing",
"views": {
"names": {
"map": "function(doc) { emit(doc.name, 1); }",
# "reduce": "function(keys, values) { return sum(values); }",
}
}
}
db.save(querydoc)
db.save_bulk([
{"_id": "kk1", "name": "Florian"},
{"_id": "kk2", "name": "Raphael"},
{"_id": "kk3", "name": "Jaideep"},
{"_id": "kk4", "name": "Andrew"},
{"_id": "kk5", "name": "Pepe"},
{"_id": "kk6", "name": "Alex"},

])
yield
db.delete("_design/testing")


@pytest.fixture
def view_duplicate_keys(db):
querydoc = {
"_id": "_design/testing",
"views": {
"names": {
"map": "function(doc) { emit(doc.name, 1); }",
# "reduce": "function(keys, values) { return sum(values); }",
}
}
}
db.save(querydoc)
db.save_bulk([
{"_id": "kk1", "name": "Andrew"},
{"_id": "kk2", "name": "Andrew"},
{"_id": "kk3", "name": "Andrew"},
{"_id": "kk4", "name": "Andrew"},
{"_id": "kk5", "name": "Andrew"},
{"_id": "kk6", "name": "Andrew"},

])
yield
db.delete("_design/testing")


def test_pagination(db, view):
# Check if invariants on pagesize are followed
with pytest.raises(AssertionError) as err:
db.query("testing/names", pagesize="123")

assert ("pagesize should be a positive integer" in str(err.value))

with pytest.raises(AssertionError) as err:
db.query("testing/names", pagesize=0)

assert ("pagesize should be a positive integer" in str(err.value))

# Check if the number of records retrieved are correct
records = list(db.query("testing/names", pagesize=1))
assert (len(records) == 6)

# Check no duplicate records are retrieved
record_ids = set(record['id'] for record in records)
assert (len(record_ids) == 6)


def test_duplicate_keys_pagination(db, view_duplicate_keys):
# Check if the number of records retrieved are correct
records = list(db.query("testing/names", pagesize=4))
print(type(records[0]))
assert (len(records) == 6)

# Check no duplicate records are retrieved
record_ids = set(record['id'] for record in records)
assert (len(record_ids) == 6)


def test_limit_pagination(db, view_duplicate_keys):
# Case 1: the paginator follows the limit
# Request only first three documents
records = list(db.query("testing/names", pagesize=10, limit=3))
assert len(records) == 3

record_ids = set(record['id'] for record in records)
assert len(record_ids) == 3

# Case 2: limit > #documents
records = list(db.query("testing/names", pagesize=10, limit=10))
assert len(records) == 6

record_ids = set(record['id'] for record in records)
assert len(record_ids) == 6


def test_large_page_size(db, view_duplicate_keys):
records = list(db.query("testing/names", pagesize=100))
assert len(records) == 6

record_ids = set(record['id'] for record in records)
assert len(record_ids) == 6

0 comments on commit 19aad3e

Please sign in to comment.