Skip to content

Commit

Permalink
Fix fromdicts generator support lazy (#626)
Browse files Browse the repository at this point in the history
* Fix generator support in fromdicts - use file cache instead of itertools.tee
* Documentation for generator support in fromdicts
* Generator support in fromdicts - use lazy file cache
* Simplify expression for dicts yield
  • Loading branch information
arturponinski authored Aug 21, 2022
1 parent f17ece9 commit c000bb5
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 11 deletions.
6 changes: 6 additions & 0 deletions docs/changes.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changes
=======

Version 1.7.11
--------------

* Fix generator support in fromdicts to use file cache
By :user:`arturponinski`, :issue:`625`.

Version 1.7.10
--------------

Expand Down
87 changes: 83 additions & 4 deletions petl/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

# standard library dependencies
import io
import itertools
import json
import inspect
from json.encoder import JSONEncoder
from os import unlink
from tempfile import NamedTemporaryFile

from petl.compat import PY2
from petl.compat import pickle
from petl.io.sources import read_source_from_arg, write_source_from_arg
# internal dependencies
from petl.util.base import data, Table, dicts as _dicts, iterpeek
Expand Down Expand Up @@ -140,6 +142,24 @@ def fromdicts(dicts, header=None, sample=1000, missing=None):
| 'c' | 2 |
+-----+-----+
Argument `dicts` can also be a generator, the output of generator
is iterated and cached using a temporary file to support further
transforms and multiple passes of the table:
>>> import petl as etl
>>> dicts = ({"foo": chr(ord("a")+i), "bar":i+1} for i in range(3))
>>> table1 = etl.fromdicts(dicts, header=['foo', 'bar'])
>>> table1
+-----+-----+
| foo | bar |
+=====+=====+
| 'a' | 1 |
+-----+-----+
| 'b' | 2 |
+-----+-----+
| 'c' | 3 |
+-----+-----+
If `header` is not specified, `sample` items from `dicts` will be
inspected to discovery dictionary keys. Note that the order in which
dictionary keys are discovered may not be stable,
Expand All @@ -156,6 +176,16 @@ def fromdicts(dicts, header=None, sample=1000, missing=None):
:func:`petl.transform.headers.sortheader` on the resulting table to
guarantee stability.
.. versionchanged:: 1.7.5
Full support of generators passed as `dicts` has been added, leveraging
`itertools.tee`.
.. versionchanged:: 1.7.11
Generator support has been modified to use temporary file cache
instead of `itertools.tee` due to high memory usage.
"""
view = DictsGeneratorView if inspect.isgenerator(dicts) else DictsView
return view(dicts, header=header, sample=sample, missing=missing)
Expand All @@ -175,9 +205,58 @@ def __iter__(self):

class DictsGeneratorView(DictsView):

def __init__(self, dicts, header=None, sample=1000, missing=None):
super(DictsGeneratorView, self).__init__(dicts, header, sample, missing)
self._filecache = None
self._cached = 0

def __iter__(self):
self.dicts, dicts = itertools.tee(self.dicts)
return iterdicts(dicts, self._header, self.sample, self.missing)
if not self._header:
self._determine_header()
yield self._header

if not self._filecache:
if PY2:
self._filecache = NamedTemporaryFile(delete=False, mode='wb+', bufsize=0)
else:
self._filecache = NamedTemporaryFile(delete=False, mode='wb+', buffering=0)

position = 0
it = iter(self.dicts)
while True:
if position < self._cached:
self._filecache.seek(position)
row = pickle.load(self._filecache)
position = self._filecache.tell()
yield row
continue
try:
o = next(it)
except StopIteration:
break
row = tuple(o.get(f, self.missing) for f in self._header)
self._filecache.seek(self._cached)
pickle.dump(row, self._filecache, protocol=-1)
self._cached = position = self._filecache.tell()
yield row

def _determine_header(self):
it = iter(self.dicts)
header = list()
peek, it = iterpeek(it, self.sample)
self.dicts = it
if isinstance(peek, dict):
peek = [peek]
for o in peek:
if hasattr(o, 'keys'):
header += [k for k in o.keys() if k not in header]
self._header = tuple(header)
return it

def __del__(self):
if self._filecache:
self._filecache.close()
unlink(self._filecache.name)


def iterjlines(f, header, missing):
Expand Down Expand Up @@ -211,7 +290,7 @@ def iterdicts(dicts, header, sample, missing):

# generate data rows
for o in it:
yield tuple(o[f] if f in o else missing for f in header)
yield tuple(o.get(f, missing) for f in header)


def tojson(table, source=None, prefix=None, suffix=None, *args, **kwargs):
Expand Down
85 changes: 78 additions & 7 deletions petl/test/io/test_json.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division


from collections import OrderedDict
from tempfile import NamedTemporaryFile
import json

import pytest

from petl.test.helpers import ieq
from petl import fromjson, fromdicts, tojson, tojsonarrays
Expand Down Expand Up @@ -121,7 +122,6 @@ def test_fromdicts_onepass():


def test_fromdicts_ordered():
from collections import OrderedDict
data = [OrderedDict([('foo', 'a'), ('bar', 1)]),
OrderedDict([('foo', 'b')]),
OrderedDict([('foo', 'c'), ('bar', 2), ('baz', True)])]
Expand All @@ -134,6 +134,18 @@ def test_fromdicts_ordered():
ieq(expect, actual)


def test_fromdicts_missing():
data = [OrderedDict([('foo', 'a'), ('bar', 1)]),
OrderedDict([('foo', 'b')]),
OrderedDict([('foo', 'c'), ('bar', 2), ('baz', True)])]
actual = fromdicts(data, missing="x")
expect = (('foo', 'bar', 'baz'),
('a', 1, "x"),
('b', "x", "x"),
('c', 2, True))
ieq(expect, actual)


def test_tojson():

# exercise function
Expand Down Expand Up @@ -181,7 +193,6 @@ def test_fromdicts_header_does_not_raise():


def test_fromdicts_header_list():
from collections import OrderedDict
data = [OrderedDict([('foo', 'a'), ('bar', 1)]),
OrderedDict([('foo', 'b'), ('bar', 2)]),
OrderedDict([('foo', 'c'), ('bar', 2)])]
Expand All @@ -196,15 +207,36 @@ def test_fromdicts_header_list():
ieq(expect, actual)


def test_fromdicts_header_generator():
from collections import OrderedDict

@pytest.fixture
def dicts_generator():
def generator():
yield OrderedDict([('foo', 'a'), ('bar', 1)])
yield OrderedDict([('foo', 'b'), ('bar', 2)])
yield OrderedDict([('foo', 'c'), ('bar', 2)])
return generator()


def test_fromdicts_generator_single(dicts_generator):
actual = fromdicts(dicts_generator)
expect = (('foo', 'bar'),
('a', 1),
('b', 2),
('c', 2))
ieq(expect, actual)

actual = fromdicts(generator())

def test_fromdicts_generator_twice(dicts_generator):
actual = fromdicts(dicts_generator)
expect = (('foo', 'bar'),
('a', 1),
('b', 2),
('c', 2))
ieq(expect, actual)
ieq(expect, actual)


def test_fromdicts_generator_header(dicts_generator):
actual = fromdicts(dicts_generator)
header = actual.header()
assert header == ('foo', 'bar')
expect = (('foo', 'bar'),
Expand All @@ -213,3 +245,42 @@ def generator():
('c', 2))
ieq(expect, actual)
ieq(expect, actual)


def test_fromdicts_generator_random_access():
def generator():
for i in range(5):
yield OrderedDict([('n', i), ('foo', 100*i), ('bar', 200*i)])

actual = fromdicts(generator(), sample=3)
assert actual.header() == ('n', 'foo', 'bar')
# first pass
it1 = iter(actual)
first_row1 = next(it1)
first_row2 = next(it1)
# second pass
it2 = iter(actual)
second_row1 = next(it2)
second_row2 = next(it2)
assert first_row1 == second_row1
assert first_row2 == second_row2
# reverse order
second_row3 = next(it2)
first_row3 = next(it1)
assert second_row3 == first_row3
ieq(actual, actual)
assert actual.header() == ('n', 'foo', 'bar')
assert len(actual) == 6


def test_fromdicts_generator_missing():
def generator():
yield OrderedDict([('foo', 'a'), ('bar', 1)])
yield OrderedDict([('foo', 'b'), ('bar', 2)])
yield OrderedDict([('foo', 'c'), ('baz', 2)])
actual = fromdicts(generator(), missing="x")
expect = (('foo', 'bar', 'baz'),
('a', 1, "x"),
('b', 2, "x"),
('c', "x", 2))
ieq(expect, actual)

0 comments on commit c000bb5

Please sign in to comment.