From c000bb5e50300c83cf75fca380ef63168aa888f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Poni=C5=84ski?= <89907595+arturponinski@users.noreply.github.com> Date: Sun, 21 Aug 2022 20:57:30 +0200 Subject: [PATCH] Fix fromdicts generator support lazy (#626) * Fix generator support in fromdicts - use file cache instead of itertools.tee * Documentation for generator support in fromdicts * Generator support in fromdicts - use lazy file cache * Simplify expression for dicts yield --- docs/changes.rst | 6 +++ petl/io/json.py | 87 +++++++++++++++++++++++++++++++++++++-- petl/test/io/test_json.py | 85 ++++++++++++++++++++++++++++++++++---- 3 files changed, 167 insertions(+), 11 deletions(-) diff --git a/docs/changes.rst b/docs/changes.rst index fac851a6..959422dc 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -1,6 +1,12 @@ Changes ======= +Version 1.7.11 +-------------- + +* Fix generator support in fromdicts to use file cache + By :user:`arturponinski`, :issue:`625`. + Version 1.7.10 -------------- diff --git a/petl/io/json.py b/petl/io/json.py index 9ddcfea2..4b6edbf4 100644 --- a/petl/io/json.py +++ b/petl/io/json.py @@ -3,12 +3,14 @@ # standard library dependencies import io -import itertools import json import inspect from json.encoder import JSONEncoder +from os import unlink +from tempfile import NamedTemporaryFile from petl.compat import PY2 +from petl.compat import pickle from petl.io.sources import read_source_from_arg, write_source_from_arg # internal dependencies from petl.util.base import data, Table, dicts as _dicts, iterpeek @@ -140,6 +142,24 @@ def fromdicts(dicts, header=None, sample=1000, missing=None): | 'c' | 2 | +-----+-----+ + Argument `dicts` can also be a generator, the output of generator + is iterated and cached using a temporary file to support further + transforms and multiple passes of the table: + + >>> import petl as etl + >>> dicts = ({"foo": chr(ord("a")+i), "bar":i+1} for i in range(3)) + >>> table1 = etl.fromdicts(dicts, header=['foo', 'bar']) + >>> table1 + +-----+-----+ + | foo | bar | + +=====+=====+ + | 'a' | 1 | + +-----+-----+ + | 'b' | 2 | + +-----+-----+ + | 'c' | 3 | + +-----+-----+ + If `header` is not specified, `sample` items from `dicts` will be inspected to discovery dictionary keys. Note that the order in which dictionary keys are discovered may not be stable, @@ -156,6 +176,16 @@ def fromdicts(dicts, header=None, sample=1000, missing=None): :func:`petl.transform.headers.sortheader` on the resulting table to guarantee stability. + .. versionchanged:: 1.7.5 + + Full support of generators passed as `dicts` has been added, leveraging + `itertools.tee`. + + .. versionchanged:: 1.7.11 + + Generator support has been modified to use temporary file cache + instead of `itertools.tee` due to high memory usage. + """ view = DictsGeneratorView if inspect.isgenerator(dicts) else DictsView return view(dicts, header=header, sample=sample, missing=missing) @@ -175,9 +205,58 @@ def __iter__(self): class DictsGeneratorView(DictsView): + def __init__(self, dicts, header=None, sample=1000, missing=None): + super(DictsGeneratorView, self).__init__(dicts, header, sample, missing) + self._filecache = None + self._cached = 0 + def __iter__(self): - self.dicts, dicts = itertools.tee(self.dicts) - return iterdicts(dicts, self._header, self.sample, self.missing) + if not self._header: + self._determine_header() + yield self._header + + if not self._filecache: + if PY2: + self._filecache = NamedTemporaryFile(delete=False, mode='wb+', bufsize=0) + else: + self._filecache = NamedTemporaryFile(delete=False, mode='wb+', buffering=0) + + position = 0 + it = iter(self.dicts) + while True: + if position < self._cached: + self._filecache.seek(position) + row = pickle.load(self._filecache) + position = self._filecache.tell() + yield row + continue + try: + o = next(it) + except StopIteration: + break + row = tuple(o.get(f, self.missing) for f in self._header) + self._filecache.seek(self._cached) + pickle.dump(row, self._filecache, protocol=-1) + self._cached = position = self._filecache.tell() + yield row + + def _determine_header(self): + it = iter(self.dicts) + header = list() + peek, it = iterpeek(it, self.sample) + self.dicts = it + if isinstance(peek, dict): + peek = [peek] + for o in peek: + if hasattr(o, 'keys'): + header += [k for k in o.keys() if k not in header] + self._header = tuple(header) + return it + + def __del__(self): + if self._filecache: + self._filecache.close() + unlink(self._filecache.name) def iterjlines(f, header, missing): @@ -211,7 +290,7 @@ def iterdicts(dicts, header, sample, missing): # generate data rows for o in it: - yield tuple(o[f] if f in o else missing for f in header) + yield tuple(o.get(f, missing) for f in header) def tojson(table, source=None, prefix=None, suffix=None, *args, **kwargs): diff --git a/petl/test/io/test_json.py b/petl/test/io/test_json.py index ca02239a..37dc85fe 100644 --- a/petl/test/io/test_json.py +++ b/petl/test/io/test_json.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, print_function, division - +from collections import OrderedDict from tempfile import NamedTemporaryFile import json +import pytest from petl.test.helpers import ieq from petl import fromjson, fromdicts, tojson, tojsonarrays @@ -121,7 +122,6 @@ def test_fromdicts_onepass(): def test_fromdicts_ordered(): - from collections import OrderedDict data = [OrderedDict([('foo', 'a'), ('bar', 1)]), OrderedDict([('foo', 'b')]), OrderedDict([('foo', 'c'), ('bar', 2), ('baz', True)])] @@ -134,6 +134,18 @@ def test_fromdicts_ordered(): ieq(expect, actual) +def test_fromdicts_missing(): + data = [OrderedDict([('foo', 'a'), ('bar', 1)]), + OrderedDict([('foo', 'b')]), + OrderedDict([('foo', 'c'), ('bar', 2), ('baz', True)])] + actual = fromdicts(data, missing="x") + expect = (('foo', 'bar', 'baz'), + ('a', 1, "x"), + ('b', "x", "x"), + ('c', 2, True)) + ieq(expect, actual) + + def test_tojson(): # exercise function @@ -181,7 +193,6 @@ def test_fromdicts_header_does_not_raise(): def test_fromdicts_header_list(): - from collections import OrderedDict data = [OrderedDict([('foo', 'a'), ('bar', 1)]), OrderedDict([('foo', 'b'), ('bar', 2)]), OrderedDict([('foo', 'c'), ('bar', 2)])] @@ -196,15 +207,36 @@ def test_fromdicts_header_list(): ieq(expect, actual) -def test_fromdicts_header_generator(): - from collections import OrderedDict - +@pytest.fixture +def dicts_generator(): def generator(): yield OrderedDict([('foo', 'a'), ('bar', 1)]) yield OrderedDict([('foo', 'b'), ('bar', 2)]) yield OrderedDict([('foo', 'c'), ('bar', 2)]) + return generator() + + +def test_fromdicts_generator_single(dicts_generator): + actual = fromdicts(dicts_generator) + expect = (('foo', 'bar'), + ('a', 1), + ('b', 2), + ('c', 2)) + ieq(expect, actual) - actual = fromdicts(generator()) + +def test_fromdicts_generator_twice(dicts_generator): + actual = fromdicts(dicts_generator) + expect = (('foo', 'bar'), + ('a', 1), + ('b', 2), + ('c', 2)) + ieq(expect, actual) + ieq(expect, actual) + + +def test_fromdicts_generator_header(dicts_generator): + actual = fromdicts(dicts_generator) header = actual.header() assert header == ('foo', 'bar') expect = (('foo', 'bar'), @@ -213,3 +245,42 @@ def generator(): ('c', 2)) ieq(expect, actual) ieq(expect, actual) + + +def test_fromdicts_generator_random_access(): + def generator(): + for i in range(5): + yield OrderedDict([('n', i), ('foo', 100*i), ('bar', 200*i)]) + + actual = fromdicts(generator(), sample=3) + assert actual.header() == ('n', 'foo', 'bar') + # first pass + it1 = iter(actual) + first_row1 = next(it1) + first_row2 = next(it1) + # second pass + it2 = iter(actual) + second_row1 = next(it2) + second_row2 = next(it2) + assert first_row1 == second_row1 + assert first_row2 == second_row2 + # reverse order + second_row3 = next(it2) + first_row3 = next(it1) + assert second_row3 == first_row3 + ieq(actual, actual) + assert actual.header() == ('n', 'foo', 'bar') + assert len(actual) == 6 + + +def test_fromdicts_generator_missing(): + def generator(): + yield OrderedDict([('foo', 'a'), ('bar', 1)]) + yield OrderedDict([('foo', 'b'), ('bar', 2)]) + yield OrderedDict([('foo', 'c'), ('baz', 2)]) + actual = fromdicts(generator(), missing="x") + expect = (('foo', 'bar', 'baz'), + ('a', 1, "x"), + ('b', 2, "x"), + ('c', "x", 2)) + ieq(expect, actual)