Skip to content

Commit

Permalink
Added functionality to attach multiple processes to the same experime…
Browse files Browse the repository at this point in the history
…nt (#193)

All processes are being able to write to the database tables of the same experiment.
  • Loading branch information
LukasFehring authored Jun 12, 2024
1 parent 0cfb381 commit 2fed0ca
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 6 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Feature

- Added documentation about how to execute PyExperimenter on distributed machines.
- Improved the usage and documentation of ssh tunnel to be more flexible and user friendly.
- Add add_experiment_and_execute method to PyExperimenter to add and execute an experiment in one step.
- Added add_experiment_and_execute method to PyExperimenter to add and execute an experiment in one step.
- Added functionality to attach multiple processes to the same experiment, all being able to write to the database tables of the same experiment.

Fix
---
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/example_logtables.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.0"
"version": "3.9.19"
},
"orig_nbformat": 4,
"vscode": {
Expand Down
2 changes: 1 addition & 1 deletion docs/source/usage/distributed_execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ and then add the configured experiments experiments in the worker job, followed
)
experimenter.add_experiment_and_execute(keyfield_values_from_config, _experiment_function)
.. _hydra_submitit: https://hydra.cc/docs/plugins/submitit_launcher/
.. _hydra_submitit: https://hydra.cc/docs/plugins/submitit_launcher/
54 changes: 54 additions & 0 deletions docs/source/usage/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,60 @@ Instead of filling the database table with rows and then executing the experimen
This function may be useful in case of dependencies, where the result of one experiment is needed to configure the next one, or if the experiments are supposed to be configured with software such as `Hydra <hydra_>`_.

.. _attach:

----------------------------
Attach to Running Experiment
----------------------------

For cases of multiprocessing, where the ``experiment_function`` contains a main job, that runs multiple additional workers in other processes (maybe on a different machine), it is inconvenient to log all information through the main job. Therefore, we allow these workers to also attach to the database and log their information about the same experiment.

First, a worker experiment function wrapper has to be defined, which handles the parallel execution of something in a different process. The actual worker experiment function is defined inside the wrapper. The worker function is then attached to the experiment and logs its information on its own. In case more arguments are needed within the worker function, they can be passed to the wrapper function as keyword arguments.

.. code-block:: python
def worker_experiment_function_wrapper(experiment_id: int, **kwargs):
def worker_experiment_function(result_processor: ResultProcessor):
# Worker Experiment Execution
result = do_something_else()
result_processor.process_logs(
# Some Logs
)
return result
return experimenter.attach(worker_experiment_function, experiment_id)
.. note::

The ``experimenter.attach`` function returns the result of ``worker_experiment_function``.

Second, the main experiment function has to be defined calling the above created wrapper, which is provided with the ``experiment_id`` and started in a different process:

.. code-block:: python
def main_experiment_function(keyfields: dict, result_processor: ResultProcessor, custom_fields: Dict):
# Main Experiment Execution
do_something()
# Start worker in different process, and provide it with the experiment_id
result = worker_experiment_function_wrapper(result_processor.experiment_id)
# Compute Something
do_more()
result_processor.process_results(
# Results
)
Afterwards, the main experiment function can be started as usual:

.. code-block:: python
experimenter.execute(main_experiment_function, max_experiments=-1)
.. _reset_experiments:

Expand Down
8 changes: 6 additions & 2 deletions py_experimenter/database_connector_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,18 @@ def start_ssh_tunnel(self, logger: Logger):
tunnel = self.get_ssh_tunnel(logger)
# Tunnels may not be stopepd instantly, so we check if the tunnel is active before starting it
if tunnel is not None and not tunnel.is_active:
tunnel.start()
try:
tunnel.start()
except Exception as e:
logger.warning("Failed at creating SSH tunnel. Maybe the tunnel is already active in other process?")
logger.warning(e)

def close_ssh_tunnel(self):
if not self.database_configuration.use_ssh_tunnel:
self.logger.warning("Attempt to close SSH tunnel, but ssh tunnel is not used.")
tunnel = self.get_ssh_tunnel(self.logger)
if tunnel is not None:
tunnel.stop(force=True)
tunnel.stop(force=False)

def _test_connection(self):
try:
Expand Down
22 changes: 22 additions & 0 deletions py_experimenter/experimenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,28 @@ def unpause_experiment(self, experiment_id: int, experiment_function: Callable)

self._delete_codecarbon_config()

def attach(self, experiment_function: Callable, experiment_id:int) -> None:
"""
Executes the given `experiment_function` on the allready running experiment.
Note that the `experiment_function` signature differs from the `experiment_function` used in the `execute` method!
:param experiment_function: The function that should be executed..
:type own_function: Callable
:param experiment_id: The id of the experiment to be executed.
:type experiment_id: int
:raises ValueError: If the database provider is sqlite.
"""
if self.config.database_configuration.provider == "sqlite":
raise ValueError("Attaching is not supported for sqlite databases")
if self.use_codecarbon:
self.logger.warning(
"CodeCarbon is not supported for attached functions. Therefore the emissions are only tracked for the main experiment."
)

result_processor = ResultProcessor(self.config.database_configuration, self.db_connector, experiment_id=experiment_id, logger=self.logger)

return experiment_function(result_processor)

def _worker(self, experiment_function: Callable[[Dict, Dict, ResultProcessor], None], random_order: bool) -> None:
"""
Worker that repeatedly pulls open experiments from the database table and executes them.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "py-experimenter"
version = "1.4.2a1"
version = "1.4.2a2"
description = "The PyExperimenter is a tool for the automatic execution of experiments, e.g. for machine learning (ML), capturing corresponding results in a unified manner in a database."
authors = [
"Tanja Tornede <t.tornede@ai.uni-hannover.de>",
Expand Down

0 comments on commit 2fed0ca

Please sign in to comment.