Skip to content

Commit

Permalink
Added batching module
Browse files Browse the repository at this point in the history
  • Loading branch information
sgeulette committed Mar 5, 2024
1 parent 39955d6 commit 1d47b89
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 6 deletions.
4 changes: 2 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Changelog
1.0.0rc2 (unreleased)
---------------------

- Nothing changed yet.

- Added batching module.
[sgeulette]

1.0.0rc1 (2024-02-08)
---------------------
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
'collective.fingerpointing',
'collective.monkeypatcher',
'future>=0.18.2',
'imio.pyutils>=1.0.0a0',
'imio.pyutils>=1.0.0a1',
'natsort<7',
"pathlib2;python_version<'3'",
'plone.api>1.9.1',
Expand Down
170 changes: 170 additions & 0 deletions src/imio/helpers/batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# encoding: utf-8
# IMIO <support@imio.be>
#

"""
Batching utilities methods to do a process in multiple passes (divide a for loop).
Idea: a batch number, a commit number and a loop number are considered
1) we get a stored dictionary containing the treated keys (using load_pickle function)
2) if the key is already in the dictionary, we skip it (continue)
3) if the treated items number is >= batch number, we exit the for loop, do a commit and dump the dictionary
4) otherwise, we store the corresponding key in the dictionary and increase the loop number
5) when the current loop number is a multiple of the commit number, we do a commit and dump the dictionary
6) when the for loop is globally finished, we do a commit and dump the dictionary
7) when all the items are treated, we can delete the dictionary file
See `loop_process` function in `test_batching.py` file for a complete example.
"""

from datetime import datetime
from imio.pyutils.system import dump_pickle
from imio.pyutils.system import load_pickle

import logging
import os
import transaction


logger = logging.getLogger('imio.helpers')


# 1) we get a stored dictionary containing the treated keys (using load_pickle function)
def batch_get_keys(infile, batch_number, batch_last, commit_number, loop_length=0, a_set=None):
"""Returns the stored batched keys from the file.
Must be used like this, before the loop:
batch_keys, config = batch_get_keys(infile, batch_number, commit_number)
:param infile: file name where the set is stored
:param batch_number: the batch number
:param batch_last: boolean to know if it's the last batch run
:param commit_number: the commit interval number
:param loop_length: the loop length number
:param a_set: a given data structure to get the stored keys
:return: 2 parameters: 1) a_set fulled with pickled data,
2) a config dict {'bn': batch_number, 'cn': commit_number, 'lc': loop_count, 'pf': infile}
"""
infile = os.path.abspath(infile)
if not batch_number:
return None, {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length, 'lc': 0,
'pf': infile}
if a_set is None:
a_set = set()
load_pickle(infile, a_set)
return a_set, {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length, 'lc': 0, 'pf': infile}


# 2) if the key is already in the dictionary, we skip it (continue)
def batch_skip_key(key, batch_keys, config):
"""Returns True if the key is already in the batch_keys.
Must be used like this, at the beginning of the loop:
if batch_skip_key(key, batch_keys):
continue
:param key: the hashable key of the current item
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': last, 'cn': commit, 'll': loop_length, 'lc': loop_count,
'pf': infile}
:return: True if a "continue" must be done. False otherwise.
"""
if batch_keys is None:
return False
if key not in batch_keys:
config['lc'] += 1
return False
return True


# 3) if the treated items number is higher than the batch number, we exit the loop, do a commit and dump the dictionary
# 4) otherwise, we store the corresponding key in the dictionary and increase the loop number
# 5) when the current loop number is a multiple of the commit number, we do a commit and dump the dictionary
def batch_handle_key(key, batch_keys, config):
"""Returns True if the loop must be exited.
Must be used like this, in the end of the loop:
if batch_handle_key(key, batch_keys, config):
break
:param key: the hashable key of the current item
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': last, 'cn': commit, 'll': loop_length, 'lc': loop_count,
'pf': infile}
:return: True if the loop must be exited. False otherwise.
"""
if batch_keys is None:
return False
batch_keys.add(key)
# stopping batch ?
if config['lc'] >= config['bn']:
if config['cn']:
transaction.commit()
config['ldk'] = key
dump_pickle(config['pf'], batch_keys)
logger.info("Batched %s / %s", len(batch_keys), config['ll'])
if config['bl'] and not batch_globally_finished(batch_keys, config):
logger.error('BATCHING MAYBE STOPPED TOO EARLY: %s / %s', len(batch_keys), config['ll'])
return True
# commit time ?
if config['cn'] and config['lc'] % config['cn'] == 0:
transaction.commit()
config['ldk'] = key
dump_pickle(config['pf'], batch_keys)
return False


# 6) when the loop is globally finished, we do a commit and dump the dictionary
def batch_loop_else(key, batch_keys, config):
"""Does a commit and dump the dictionary when the loop is globally finished.
Must be used like this, in the else part of the for loop:
batch_loop_else(batch_keys, config)
:param key: last key (can be None)
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': last, 'cn': commit, 'll': loop_length, 'lc': loop_count,
'pf': infile, 'ldk': last_dump_key}
"""
if batch_keys is None:
return
if key is None or (config.get('ldk') is not None and config['ldk'] == key): # avoid if already done on last key
return
if config['cn']:
transaction.commit()
dump_pickle(config['pf'], batch_keys)
logger.info("Batched %s / %s", len(batch_keys), config['ll'])
if config['bl'] and not batch_globally_finished(batch_keys, config):
logger.error('BATCHING MAYBE STOPPED TOO EARLY: %s / %s', len(batch_keys), config['ll'])


# 7) when all the items are treated, we can delete the dictionary file
def batch_delete_keys_file(batch_keys, config, rename=True):
"""Deletes the file containing the batched keys.
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': last, 'cn': commit, 'll': loop_length, 'lc': loop_count,
'pf': infile}
:param rename: do not delete but rename
"""
if batch_keys is None:
return
try:
if rename:
os.rename(config['pf'], '{}.{}'.format(config['pf'], datetime.now().strftime('%Y%m%d-%H%M%S')))
else:
os.remove(config['pf'])
except Exception as error:
logger.exception('Error while deleting the file %s: %s', config['pf'], error)


def batch_globally_finished(batch_keys, config):
"""Is the loop globally finished?
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': last, 'cn': commit, 'll': loop_length, 'lc': loop_count,
'pf': infile}
:return: True if the loop is globally finished. False otherwise.
"""
# if not batch_keys:
# return True
# finished if the treated items number is higher than the items to treat or if nothing else is treated
if config['ll']:
return len(batch_keys) >= config['ll'] or config['lc'] == 0
return config['lc'] == 0
165 changes: 165 additions & 0 deletions src/imio/helpers/tests/test_batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
from imio.helpers.batching import batch_delete_keys_file
from imio.helpers.batching import batch_get_keys
from imio.helpers.batching import batch_handle_key
from imio.helpers.batching import batch_loop_else
from imio.helpers.batching import batch_skip_key

import logging
import os
import transaction
import unittest


INFILE = 'keys.pkl'
processed = {'keys': [], 'commits': 0, 'errors': 0}


def loop_process(loop_len, batch_number, commit_number, a_set, last=False):
"""Process the loop using the batching module."""

batch_keys, config = batch_get_keys(INFILE, batch_number, last, commit_number, loop_len, a_set=a_set)
for key in range(1, loop_len + 1):
if batch_skip_key(key, batch_keys, config):
continue
processed['keys'].append(key)
if batch_handle_key(key, batch_keys, config):
break
else:
batch_loop_else(config['lc'] > 1 and key or None, batch_keys, config)
if last:
batch_delete_keys_file(batch_keys, config, rename=False)


def fake_transaction_commit():
"""Fake transaction commit."""
processed['commits'] += 1


def fake_logger_error(msg, *args, **kwargs):
"""Fake logger error."""
processed['errors'] += 1


class TestBatching(unittest.TestCase):

def setUp(self):
super(TestBatching, self).setUp()
if os.path.exists(INFILE):
os.remove(INFILE)

def test_batching(self):
orig_tc_func = transaction.commit
transaction.commit = fake_transaction_commit
logger = logging.getLogger('imio.helpers')
orig_le_func = logger.error
logger.error = fake_logger_error
a_set = set()
# no items
loop_process(0, 0, 0, a_set)
self.assertEqual(processed['keys'], [])
self.assertEqual(processed['commits'], 0)
self.assertSetEqual(a_set, set())
self.assertFalse(os.path.exists(INFILE))
# no batching
loop_process(5, 0, 0, a_set)
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 0)
self.assertSetEqual(a_set, set())
self.assertFalse(os.path.exists(INFILE))
# no batching but commit used
processed['keys'] = []
processed['commits'] = 0
loop_process(5, 0, 5, a_set)
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 0)
self.assertSetEqual(a_set, set())
self.assertFalse(os.path.exists(INFILE))
# batching: 2 passes with commit each item
processed['keys'] = []
processed['commits'] = 0
loop_process(5, 3, 1, a_set)
self.assertEqual(processed['keys'], [1, 2, 3])
self.assertEqual(processed['commits'], 3)
self.assertSetEqual(a_set, {1, 2, 3})
self.assertTrue(os.path.exists(INFILE))
loop_process(5, 3, 1, a_set)
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 5)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertTrue(os.path.exists(INFILE))
# batching: 2 passes with commit each 3 items
processed['keys'] = []
processed['commits'] = 0
a_set = set()
os.remove(INFILE)
loop_process(5, 3, 3, a_set)
self.assertEqual(processed['keys'], [1, 2, 3])
self.assertEqual(processed['commits'], 1)
self.assertSetEqual(a_set, {1, 2, 3})
self.assertTrue(os.path.exists(INFILE))
loop_process(5, 3, 3, a_set)
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 2)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertTrue(os.path.exists(INFILE))
loop_process(5, 3, 3, a_set, last=True) # what if one more call ?
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 2)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertFalse(os.path.exists(INFILE))
# batching: 1 pass with commit each 3 items
processed['keys'] = []
processed['commits'] = 0
a_set = set()
loop_process(5, 10, 3, a_set)
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 2)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertTrue(os.path.exists(INFILE))
loop_process(5, 10, 3, a_set, last=True) # what if one more call ?
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 2)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertFalse(os.path.exists(INFILE))
# batching: 1 pass with commit greather than items lentgh
processed['keys'] = []
processed['commits'] = 0
a_set = set()
loop_process(5, 10, 10, a_set)
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 1)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertTrue(os.path.exists(INFILE))
loop_process(5, 10, 10, a_set, last=True) # what if one more call ?
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 1)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertFalse(os.path.exists(INFILE))
# batching: 1 pass with 1 commit
processed['keys'] = []
processed['commits'] = 0
a_set = set()
loop_process(5, 5, 5, a_set)
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 1)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertTrue(os.path.exists(INFILE))
loop_process(5, 5, 5, a_set, last=True) # what if one more call ?
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 1)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
self.assertFalse(os.path.exists(INFILE))
# batching: finishing too early
processed['keys'] = []
processed['commits'] = 0
a_set = set()
loop_process(5, 3, 5, a_set, last=True)
self.assertEqual(processed['keys'], [1, 2, 3])
self.assertEqual(processed['commits'], 1)
self.assertEqual(processed['errors'], 1) # finished too early
self.assertSetEqual(a_set, {1, 2, 3})
self.assertFalse(os.path.exists(INFILE))

transaction.commit = orig_tc_func
logger.error = orig_le_func
3 changes: 0 additions & 3 deletions test-4.3.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,3 @@ file-read-backwards = 2.0.0
# Required by:
# Pillow==4.0.0
olefile = 0.46

# Added by buildout at 2023-11-28 16:14:08.561451
imio.pyutils = 1.0.0a0

0 comments on commit 1d47b89

Please sign in to comment.