Skip to content

Commit

Permalink
Feature/efficient task lock (#162)
Browse files Browse the repository at this point in the history
* implement lock at TaskOnKart.run()

* add dump unlock

* add redis_fail

* add comment

* add readme

* fix review

* move description to docs

* fix conflict
  • Loading branch information
mski-iksm authored Jan 20, 2021
1 parent 6088284 commit 6fd56ab
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 72 deletions.
34 changes: 0 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,40 +98,6 @@ def run(self):
```

## Advanced
### Task cache collision lock
#### Require
You need to install [redis](https://redis.io/topics/quickstart) for this advanced function.

#### Description
Task lock is implemented to prevent task cache collision.
(Originally, task cache collision may occur when same task with same parameters run at different applications parallelly.)

1. Set up a redis server at somewhere accessible from gokart/luigi jobs.

Following will run redis at your localhost.

```bash
$ redis-server
```

2. Set redis server hostname and port number as parameters to gokart.TaskOnKart().

You can set it by adding `--redis-host=[your-redis-localhost] --redis-port=[redis-port-number]` options to gokart python script.

e.g.
```bash
python main.py sample.SomeTask --local-scheduler --redis-host=localhost --redis-port=6379
```

Alternatively, you may set parameters at config file.

```conf.ini
[TaskOnKart]
redis_host=localhost
redis_port=6379
```

### Inherit task parameters with decorator
#### Description
```python
Expand Down
91 changes: 91 additions & 0 deletions docs/using_task_cache_collision_lock.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
1. Task cache collision lock
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Requires
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You need to install (redis)[https://redis.io/topics/quickstart] for this
advanced function.

Description
^^^^^^^^^^^

Task lock is implemented to prevent task cache collision. (Originally,
task cache collision may occur when same task with same parameters run
at different applications parallelly.)

1. Set up a redis server at somewhere accessible from gokart/luigi jobs.

Following will run redis at your localhost.

.. code:: bash
$ redis-server
2. Set redis server hostname and port number as parameters to gokart.TaskOnKart().

You can set it by adding ``--redis-host=[your-redis-localhost] --redis-port=[redis-port-number]`` options to gokart python script.

e.g.

.. code:: bash
python main.py sample.SomeTask –local-scheduler –redis-host=localhost –redis-port=6379
Alternatively, you may set parameters at config file.

.. code::
[TaskOnKart]
redis_host=localhost
redis_port=6379
2. Using efficient task cache collision lock
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


Description
^^^^^^^^^^^

Above task lock will prevent cache collision. However, above setting check collisions only when the task access the cache file (i.e. ``task.dump()``, ``task.load()`` and ``task.remove()``). This will allow applications to run ``run()`` of same task at the same time, which
is not efficent.

Settings in this section will prevent running ``run()`` at the same time for efficiency.

1. Set normal cache collision lock Set cache collision lock following ``1. Task cache collision lock``.

2. Decorate ``run()`` with ``@RunWithLock`` Decorate ``run()`` of yourt gokart tasks which you want to lock with ``@RunWithLock``.

.. code:: python
from gokart.run_with_lock import RunWithLock
@RunWithLock
class SomeTask(gokart.TaskOnKart):
def run(self):
3. Set ``redis_fail_on_collision`` parameter to true. This parameter will affect the behavior when the task’s lock is taken by other application. By setting ``redis_fail_on_collision=True``, task will be failed if the task’s lock is taken by other application. The locked task will be skipped and other independent task will be done first. If ``redis_fail_on_collision=False``, it will wait until the lock of other application is released.

The parameter can be set by config file.

.. code::
[TaskOnKart]
redis_host=localhost
redis_port=6379
redis_fail_on_collision=true
4. Set retry parameters. Set following parameters to retry when task
failed. Values of ``retry_count`` and ``retry_delay``\ can be set to
any value depends on your situation.

::

[scheduler]
retry_count=10000
retry_delay=10

[worker]
keep_alive=true
24 changes: 18 additions & 6 deletions gokart/redis_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

logger = getLogger(__name__)

# TODO: Codes of this file should be implemented to gokart


class RedisParams(NamedTuple):
redis_host: str
redis_port: str
redis_timeout: int
redis_key: str
should_redis_lock: bool
redis_fail_on_collision: bool


class TaskLockException(Exception):
pass


class RedisClient:
Expand Down Expand Up @@ -43,8 +46,10 @@ def with_lock(func, redis_params: RedisParams):

def wrapper(*args, **kwargs):
redis_client = RedisClient(host=redis_params.redis_host, port=redis_params.redis_port).get_redis_client()
redis_lock = redis.lock.Lock(redis=redis_client, name=redis_params.redis_key, timeout=redis_params.redis_timeout, blocking=True, thread_local=False)
redis_lock.acquire()
blocking = not redis_params.redis_fail_on_collision
redis_lock = redis.lock.Lock(redis=redis_client, name=redis_params.redis_key, timeout=redis_params.redis_timeout, thread_local=False)
if not redis_lock.acquire(blocking=blocking):
raise TaskLockException('Lock already taken by other task.')

def extend_lock():
redis_lock.extend(additional_time=redis_params.redis_timeout, replace_ttl=True)
Expand All @@ -62,6 +67,7 @@ def extend_lock():
return result
except BaseException as e:
logger.debug(f'Task lock of {redis_params.redis_key} released with BaseException.')
redis_lock.release()
scheduler.shutdown()
raise e

Expand All @@ -73,12 +79,18 @@ def make_redis_key(file_path: str, unique_id: str):
return f'{basename_without_ext}_{unique_id}'


def make_redis_params(file_path: str, unique_id: str, redis_host: str, redis_port: str, redis_timeout: int):
def make_redis_params(file_path: str,
unique_id: str,
redis_host: str = None,
redis_port: str = None,
redis_timeout: int = None,
redis_fail_on_collision: bool = False):
redis_key = make_redis_key(file_path, unique_id)
should_redis_lock = redis_host is not None and redis_port is not None
redis_params = RedisParams(redis_host=redis_host,
redis_port=redis_port,
redis_key=redis_key,
should_redis_lock=should_redis_lock,
redis_timeout=redis_timeout)
redis_timeout=redis_timeout,
redis_fail_on_collision=redis_fail_on_collision)
return redis_params
25 changes: 25 additions & 0 deletions gokart/run_with_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from functools import partial

import luigi


class RunWithLock:
def __init__(self, func):
self._func = func

def __call__(self, instance):
instance._lock_at_dump = False
output_list = luigi.task.flatten(instance.output())
return self._run_with_lock(partial(self._func, self=instance), output_list)

def __get__(self, instance, owner_class):
return partial(self.__call__, instance)

@classmethod
def _run_with_lock(cls, func, output_list: list):
if len(output_list) == 0:
return func()

output = output_list.pop()
wrapped_func = output.wrap_with_lock(func)
return cls._run_with_lock(func=wrapped_func, output_list=output_list)
32 changes: 14 additions & 18 deletions gokart/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,24 @@ def exists(self) -> bool:
return self._exists()

def load(self) -> Any:
return self._with_lock(self._load)()
return self.wrap_with_lock(self._load)()

def dump(self, obj) -> None:
self._with_lock(self._dump)(obj)
def dump(self, obj, lock_at_dump: bool = True) -> None:
if lock_at_dump:
self.wrap_with_lock(self._dump)(obj)
else:
self._dump(obj)

def remove(self) -> None:
return self._with_lock(self._remove)()
return self.wrap_with_lock(self._remove)()

def last_modification_time(self) -> datetime:
return self._last_modification_time()

def path(self) -> str:
return self._path()

def _with_lock(self, func):
def wrap_with_lock(self, func):
return with_lock(func=func, redis_params=self._get_redis_params())

@abstractmethod
Expand Down Expand Up @@ -211,32 +214,25 @@ def _get_last_modification_time(path: str) -> datetime:
return datetime.fromtimestamp(os.path.getmtime(path))


def make_target(file_path: str,
unique_id: Optional[str] = None,
processor: Optional[FileProcessor] = None,
redis_host: str = None,
redis_port: str = None,
redis_timeout: int = 180) -> TargetOnKart:
redis_params = make_redis_params(file_path=file_path, unique_id=unique_id, redis_host=redis_host, redis_port=redis_port, redis_timeout=redis_timeout)
def make_target(file_path: str, unique_id: Optional[str] = None, processor: Optional[FileProcessor] = None, redis_params: RedisParams = None) -> TargetOnKart:
_redis_params = redis_params if redis_params is not None else make_redis_params(file_path=file_path, unique_id=unique_id)
file_path = _make_file_path(file_path, unique_id)
processor = processor or make_file_processor(file_path)
file_system_target = _make_file_system_target(file_path, processor=processor)
return SingleFileTarget(target=file_system_target, processor=processor, redis_params=redis_params)
return SingleFileTarget(target=file_system_target, processor=processor, redis_params=_redis_params)


def make_model_target(file_path: str,
temporary_directory: str,
save_function,
load_function,
unique_id: Optional[str] = None,
redis_host: str = None,
redis_port: str = None,
redis_timeout: int = 180) -> TargetOnKart:
redis_params = make_redis_params(file_path=file_path, unique_id=unique_id, redis_host=redis_host, redis_port=redis_port, redis_timeout=redis_timeout)
redis_params: RedisParams = None) -> TargetOnKart:
_redis_params = redis_params if redis_params is not None else make_redis_params(file_path=file_path, unique_id=unique_id)
file_path = _make_file_path(file_path, unique_id)
temporary_directory = os.path.join(temporary_directory, hashlib.md5(file_path.encode()).hexdigest())
return ModelTarget(file_path=file_path,
temporary_directory=temporary_directory,
save_function=save_function,
load_function=load_function,
redis_params=redis_params)
redis_params=_redis_params)
37 changes: 27 additions & 10 deletions gokart/task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import partial
import hashlib
import os
import sys
Expand All @@ -14,6 +15,7 @@
from gokart.pandas_type_config import PandasTypeConfigMap
from gokart.parameter import TaskInstanceParameter, ListTaskInstanceParameter
from gokart.target import TargetOnKart
from gokart.redis_lock import make_redis_params

logger = getLogger(__name__)

Expand Down Expand Up @@ -52,6 +54,10 @@ class TaskOnKart(luigi.Task):
redis_host = luigi.Parameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_port = luigi.Parameter(default=None, description='Task lock check is deactivated, when None.', significant=False)
redis_timeout = luigi.IntParameter(default=180, description='Redis lock will be released after `redis_timeout` seconds', significant=False)
redis_fail_on_collision: bool = luigi.BoolParameter(
default=False,
description='True for failing the task immediately when the cache is locked, instead of waiting for the lock to be released',
significant=False)
fail_on_empty_dump: bool = gokart.ExplicitBoolParameter(default=False, description='Fail when task dumps empty DF', significant=False)

def __init__(self, *args, **kwargs):
Expand All @@ -61,6 +67,7 @@ def __init__(self, *args, **kwargs):
self.task_unique_id = None
super(TaskOnKart, self).__init__(*args, **kwargs)
self._rerun_state = self.rerun
self._lock_at_dump = True

def output(self):
return self.make_target()
Expand Down Expand Up @@ -137,26 +144,32 @@ def make_target(self, relative_file_path: str = None, use_unique_id: bool = True
f"{type(self).__name__}.pkl")
file_path = os.path.join(self.workspace_directory, formatted_relative_file_path)
unique_id = self.make_unique_id() if use_unique_id else None
return gokart.target.make_target(file_path=file_path,

redis_params = make_redis_params(file_path=file_path,
unique_id=unique_id,
processor=processor,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout)
redis_timeout=self.redis_timeout,
redis_fail_on_collision=self.redis_fail_on_collision)
return gokart.target.make_target(file_path=file_path, unique_id=unique_id, processor=processor, redis_params=redis_params)

def make_large_data_frame_target(self, relative_file_path: str = None, use_unique_id: bool = True, max_byte=int(2**26)) -> TargetOnKart:
formatted_relative_file_path = relative_file_path if relative_file_path is not None else os.path.join(self.__module__.replace(".", "/"),
f"{type(self).__name__}.zip")
file_path = os.path.join(self.workspace_directory, formatted_relative_file_path)
unique_id = self.make_unique_id() if use_unique_id else None
redis_params = make_redis_params(file_path=file_path,
unique_id=unique_id,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
redis_fail_on_collision=self.redis_fail_on_collision)
return gokart.target.make_model_target(file_path=file_path,
temporary_directory=self.local_temporary_directory,
unique_id=unique_id,
save_function=gokart.target.LargeDataFrameProcessor(max_byte=max_byte).save,
load_function=gokart.target.LargeDataFrameProcessor.load,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout)
redis_params=redis_params)

def make_model_target(self,
relative_file_path: str,
Expand All @@ -174,14 +187,18 @@ def make_model_target(self,
file_path = os.path.join(self.workspace_directory, relative_file_path)
assert relative_file_path[-3:] == 'zip', f'extension must be zip, but {relative_file_path} is passed.'
unique_id = self.make_unique_id() if use_unique_id else None
redis_params = make_redis_params(file_path=file_path,
unique_id=unique_id,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout,
redis_fail_on_collision=self.redis_fail_on_collision)
return gokart.target.make_model_target(file_path=file_path,
temporary_directory=self.local_temporary_directory,
unique_id=unique_id,
save_function=save_function,
load_function=load_function,
redis_host=self.redis_host,
redis_port=self.redis_port,
redis_timeout=self.redis_timeout)
redis_params=redis_params)

def load(self, target: Union[None, str, TargetOnKart] = None) -> Any:
def _load(targets):
Expand Down Expand Up @@ -230,7 +247,7 @@ def dump(self, obj, target: Union[None, str, TargetOnKart] = None) -> None:
PandasTypeConfigMap().check(obj, task_namespace=self.task_namespace)
if self.fail_on_empty_dump and isinstance(obj, pd.DataFrame):
assert not obj.empty
self._get_output_target(target).dump(obj)
self._get_output_target(target).dump(obj, lock_at_dump=self._lock_at_dump)

def make_unique_id(self):
self.task_unique_id = self.task_unique_id or self._make_hash_id()
Expand Down
Loading

0 comments on commit 6fd56ab

Please sign in to comment.