Skip to content

Commit

Permalink
Merge pull request #198 from tornede/develop
Browse files Browse the repository at this point in the history
  • Loading branch information
tornede authored Jun 13, 2024
2 parents 6f1b654 + 06d0884 commit 1751e31
Show file tree
Hide file tree
Showing 20 changed files with 1,011 additions and 557 deletions.
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
"grammarly.selectors": [
{
"language": "restructuredtext",
"scheme": "file"
}
],
}
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@
Changelog
=========


v1.4.2 (12.06.2024)
===================

Feature
-------

- Added documentation about how to execute PyExperimenter on distributed machines.
- Improved the usage and documentation of ssh tunnel to be more flexible and user friendly.
- Added add_experiment_and_execute method to PyExperimenter to add and execute an experiment in one step.
- Added functionality to attach multiple processes to the same experiment, all being able to write to the database tables of the same experiment.


v1.4.1 (11.03.2024)
===================

Expand Down
1,042 changes: 583 additions & 459 deletions docs/source/examples/example_general_usage.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/source/examples/example_logtables.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.0"
"version": "3.9.19"
},
"orig_nbformat": 4,
"vscode": {
Expand Down
2 changes: 1 addition & 1 deletion docs/source/usage/database_credential_file.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Below is an example of a database credential file, that connects to a server wit
server: example.mysqlserver.com
However, for security reasons, databases might only be accessible from a specific IP address. In these cases, one can use an ssh jumphost. This means that ``PyExperimenter`` will first connect to the ssh server
that has access to the database and then connect to the database server from there. This is done by adding an additional ``Ssh`` section to the database credential file.
that has access to the database and then connect to the database server from there. This is done by adding an additional ``Ssh`` section to the database credential file, and can be activated either by a ``PyExperimenter`` keyword argument or in the :ref:`experimenter configuration file <experiment_configuration_file>`.
The following example shows how to connect to a database server using an SSH server with the address ``ssh_hostname`` and the port ``optional_ssh_port``.

.. code-block:: yaml
Expand Down
81 changes: 81 additions & 0 deletions docs/source/usage/distributed_execution.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
.. _distributed_execution:

=====================
Distributed Execution
=====================
To distribute the execution of experiments across multiple machines, you can follow the standard :ref:`procedure of using PyExperimenter <execution>`, with the following additional considerations.

--------------
Database Setup
--------------
You need to have a shared database that is accessible to all the machines and supports concurrent access. Thus, ``SQLite`` is not a good choice for this purpose, which is why we recommend using a ``MySQL`` database instead.

--------
Workflow
--------
While it is theoretically possible for multiple jobs to create new experiments, this introduces the possibility of creating the same experiment multiple times. To prevent this, we recommend the following workflow, where a process is either the ``database handler``, i.e. responsible to create/reset experiment, or a ``experiment executer`` actually executing experiments.

.. note::
Make sure to use the same :ref:`experiment configuration file <experiment_configuration_file>`, and :ref:`database credential file <database_credential_file>` for both types.


Database Handling
-----------------

The ``database handler`` process creates/resets the experiments and stores them in the database once in advance.

.. code-block:: python
from py_experimenter.experimenter import PyExperimenter
experimenter = PyExperimenter(
experiment_configuration_file_path = "path/to/file",
database_credential_file_path = "path/to/file"
)
experimenter.fill_table_from_config()
Experiment Execution
--------------------

Multiple ``experiment executer`` processes execute the experiments in parallel on different machines, all using the same code. In a typical HPC context, each job starts a single ``experiment executer`` process on a different node.

.. code-block:: python
from py_experimenter.experimenter import PyExperimenter
experimenter.execute(experiment_function, max_experiments=1)
Add Experiment and Execute
--------------------------

When executing jobs on clusters one might want to use `hydra combined with submitit <hydra_submitit_>`_ or a similar software that configures different jobs. If so it makes sense to create the database initially

.. code-block:: python
...
experimenter = PyExperimenter(
experiment_configuration_file_path = "path/to/file",
database_credential_file_path = "path/to/file"
)
experimenter.create_table()
and then add the configured experiments experiments in the worker job, followed by an immediate execution.

.. code-block:: python
def _experiment_function(keyfields: dict, result_processor: ResultProcessor, custom_fields: dict):
...
...
@hydra.main(config_path="config", config_name="hydra_configname", version_base="1.1")
def experiment_wrapepr(config: Configuration):
...
experimenter = PyExperimenter(
experiment_configuration_file_path = "some/value/from/config",
database_credential_file_path = "path/to/file"
)
experimenter.add_experiment_and_execute(keyfield_values_from_config, _experiment_function)
.. _hydra_submitit: https://hydra.cc/docs/plugins/submitit_launcher/
77 changes: 76 additions & 1 deletion docs/source/usage/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,78 @@ An experiment can be executed easily with the following call:
- ``max_experiments`` determines how many experiments will be executed by this ``PyExperimenter``. If set to ``-1``, it will execute experiments in a sequential fashion until no more open experiments are available.
- ``random_order`` determines if the experiments will be executed in a random order. By default, the parameter is set to ``False``, meaning that experiments will be executed ordered by their ``id``.

.. _add_experiment_and_execute:

--------------------------
Add Experiment and Execute
--------------------------

Instead of filling the database table with rows and then executing the experiments, it is also possible to add an experiment and execute it directly. This can be done with the following call:

.. code-block:: python
experimenter.add_experiment_and_execute(
keyfields = {'dataset': 'new_data', 'cross_validation_splits': 4, 'seed': 42, 'kernel': 'poly'},
experiment_function = run_experiment
)
This function may be useful in case of dependencies, where the result of one experiment is needed to configure the next one, or if the experiments are supposed to be configured with software such as `Hydra <hydra_>`_.

.. _attach:

----------------------------
Attach to Running Experiment
----------------------------

For cases of multiprocessing, where the ``experiment_function`` contains a main job, that runs multiple additional workers in other processes (maybe on a different machine), it is inconvenient to log all information through the main job. Therefore, we allow these workers to also attach to the database and log their information about the same experiment.

First, a worker experiment function wrapper has to be defined, which handles the parallel execution of something in a different process. The actual worker experiment function is defined inside the wrapper. The worker function is then attached to the experiment and logs its information on its own. In case more arguments are needed within the worker function, they can be passed to the wrapper function as keyword arguments.

.. code-block:: python
def worker_experiment_function_wrapper(experiment_id: int, **kwargs):
def worker_experiment_function(result_processor: ResultProcessor):
# Worker Experiment Execution
result = do_something_else()
result_processor.process_logs(
# Some Logs
)
return result
return experimenter.attach(worker_experiment_function, experiment_id)
.. note::

The ``experimenter.attach`` function returns the result of ``worker_experiment_function``.

Second, the main experiment function has to be defined calling the above created wrapper, which is provided with the ``experiment_id`` and started in a different process:

.. code-block:: python
def main_experiment_function(keyfields: dict, result_processor: ResultProcessor, custom_fields: Dict):
# Main Experiment Execution
do_something()
# Start worker in different process, and provide it with the experiment_id
result = worker_experiment_function_wrapper(result_processor.experiment_id)
# Compute Something
do_more()
result_processor.process_results(
# Results
)
Afterwards, the main experiment function can be started as usual:

.. code-block:: python
experimenter.execute(main_experiment_function, max_experiments=-1)
.. _reset_experiments:

-----------------
Expand Down Expand Up @@ -214,4 +286,7 @@ If an SSH tunnel was opened during the creation of the ``PyExperimenter``, it ha
.. code-block:: python
experimenter.execute(...)
experimenter.close_ssh_tunnel()
experimenter.close_ssh_tunnel()
.. _hydra: https://hydra.cc/
2 changes: 2 additions & 0 deletions docs/source/usage/experiment_configuration_file.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The experiment configuration file is primarily used to define the database backe
Database:
provider: sqlite
database: py_experimenter
use_ssh_tunnel: False
table:
name: example_general_usage
keyfields:
Expand Down Expand Up @@ -69,6 +70,7 @@ The ``Database`` section defines the database and its structure.

- ``provider``: The provider of the database connection. Currently, ``sqlite`` and ``mysql`` are supported. In the case of ``mysql`` an additional :ref:`database credential file <database_credential_file>` has to be created.
- ``database``: The name of the database to create or connect to.
- ``use_ssh_tunnel``: Flag to decide if the database is connected via ssh as defined in the :ref:`database credential file <database_credential_file>`. This is ignored if ``sqlite`` is chosen as provider. Optional Parameter, default is False.
- ``table``: Defines the structure and predefined values for the experiment table.

- ``name``: The name of the experiment table to create or connect to.
Expand Down
1 change: 1 addition & 0 deletions docs/source/usage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ The following steps are necessary to execute the ``PyExperimenter``.
./database_credential_file
./experiment_function
./execution
./distributed_execution
17 changes: 12 additions & 5 deletions py_experimenter/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
from omegaconf import DictConfig, ListConfig, OmegaConf

from py_experimenter import utils
from py_experimenter.exceptions import (
InvalidColumnError,
InvalidConfigError,
InvalidLogtableError,
)
from py_experimenter.exceptions import InvalidColumnError, InvalidConfigError, InvalidLogtableError


class Cfg(ABC):
Expand Down Expand Up @@ -45,6 +41,7 @@ class DatabaseCfg(Cfg):
def __init__(
self,
provider: str,
use_ssh_tunnel: bool,
database_name: str,
table_name: str,
result_timestamps: bool,
Expand All @@ -58,6 +55,8 @@ def __init__(
:param provider: Database Provider; either `sqlite` or `mysql`
:type provider: str
:param use_ssh_tunnel: Whether to use an SSH tunnel to connect to the database
:type use_ssh_tunnel: bool
:param database_name: Name of the database
:type database_name: str
:param table_name: Name of the table
Expand All @@ -71,6 +70,7 @@ def __init__(
:type logtables: Dict[str, Dict[str,str]]
"""
self.provider = provider
self.use_ssh_tunnel = use_ssh_tunnel
self.database_name = database_name
self.table_name = table_name
self.result_timestamps = result_timestamps
Expand All @@ -85,6 +85,8 @@ def extract_config(config: OmegaConf, logger: logging.Logger) -> Tuple["Database
database_config = config["PY_EXPERIMENTER"]["Database"]
table_config = database_config["table"]
provider = database_config["provider"]
# Optional use_ssh_tunnel
use_ssh_tunnel = database_config["use_ssh"] if "use_ssh" in database_config else False
database_name = database_config["database"]
table_name = database_config["table"]["name"]

Expand All @@ -97,6 +99,7 @@ def extract_config(config: OmegaConf, logger: logging.Logger) -> Tuple["Database

return DatabaseCfg(
provider,
use_ssh_tunnel,
database_name,
table_name,
result_timestamps,
Expand Down Expand Up @@ -208,6 +211,9 @@ def valid(self) -> bool:
if self.provider not in ["sqlite", "mysql"]:
self.logger.error("Database provider must be either sqlite or mysql")
return False
if self.use_ssh_tunnel not in [True, False]:
self.logger.error("Use SSH tunnel must be a boolean.")
return False
if not isinstance(self.database_name, str):
self.logger.error("Database name must be a string")
return False
Expand Down Expand Up @@ -372,6 +378,7 @@ def extract_config(config_path: str, logger: logging.Logger) -> "PyExperimenterC
def valid(self) -> bool:
if not isinstance(self.n_jobs, int) and self.n_jobs > 0:
self.logger.error("n_jobs must be a positive integer")
return False
if not (self.database_configuration.valid() and self.custom_configuration.valid() and self.codecarbon_configuration.valid()):
self.logger.error("Database configuration invalid")
return False
Expand Down
48 changes: 42 additions & 6 deletions py_experimenter/database_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ def fill_table(self, combinations) -> None:
if self._check_combination_in_existing_rows(combination, existing_rows):
rows_skipped += 1
continue
combination["status"] = ExperimentStatus.CREATED.value
combination["creation_date"] = time
combination = self._add_metadata(combination, time)
rows.append(combination)

if rows:
Expand All @@ -183,6 +182,44 @@ def fill_table(self, combinations) -> None:
else:
self.logger.info(f"No rows to add. All the {len(combinations)} experiments already exist.")

def add_experiment(self, combination: Dict[str, str]) -> None:
existing_rows = self._get_existing_rows(list(self.database_configuration.keyfields.keys()))
if self._check_combination_in_existing_rows(combination, existing_rows):
self.logger.info("Experiment already exists in database. Skipping.")
return

connection = self.connect()
try:
cursor = self.cursor(connection)
combination = self._add_metadata(combination, utils.get_timestamp_representation(), ExperimentStatus.RUNNING.value)
insert_query = self._get_insert_query(self.database_configuration.table_name, list(combination.keys()))
self.execute(cursor, insert_query, list(combination.values()))
cursor.execute(f"SELECT {self._last_insert_id_string()};")
experiment_id = cursor.fetchone()[0]
self.commit(connection)
except Exception as e:
raise DatabaseConnectionError(f"error \n{e}\n raised when adding experiment to database.")
finally:
self.close_connection(connection)
return experiment_id

def _get_insert_query(self, table_name: str, columns: List[str]) -> str:
return f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join([self._prepared_statement_placeholder] * len(columns))})"

@abc.abstractmethod
def _last_insert_id_string(self) -> str:
pass

def _add_metadata(
self,
combination: Dict[str, Any],
time: str,
status: ExperimentStatus = ExperimentStatus.CREATED.value,
) -> Dict[str, Any]:
combination["creation_date"] = time
combination["status"] = status
return combination

def _check_combination_in_existing_rows(self, combination, existing_rows) -> bool:
if combination in existing_rows:
return True
Expand Down Expand Up @@ -239,16 +276,15 @@ def _get_pull_experiment_query(self, order_by: str):
def _write_to_database(self, combinations: List[Dict[str, str]]) -> None:
columns = list(combinations[0].keys())
values = [list(combination.values()) for combination in combinations]
prepared_statement_palcehodler = ','.join([f"({', '.join([self._prepared_statement_placeholder] * len(columns))})"] * len(combinations))
prepared_statement_palcehodler = ",".join([f"({', '.join([self._prepared_statement_placeholder] * len(columns))})"] * len(combinations))

stmt = f"INSERT INTO {self.database_configuration.table_name} ({','.join(columns)}) VALUES {prepared_statement_palcehodler}"
values = reduce(concat, values)
values = reduce(concat, values)
connection = self.connect()
cursor = self.cursor(connection)
self.execute(cursor, stmt, values)
self.commit(connection)
self.close_connection(connection)


def pull_paused_experiment(self, experiment_id: int) -> Dict[str, Any]:
connnection = self.connect()
Expand Down
Loading

0 comments on commit 1751e31

Please sign in to comment.