Skip to content

Commit

Permalink
Adding live csv config details data appending
Browse files Browse the repository at this point in the history
  • Loading branch information
TarekAbouChakra committed Nov 20, 2023
1 parent 5d3f851 commit 3655898
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 27 deletions.
16 changes: 11 additions & 5 deletions src/metahyper/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,14 @@ def _sample_config(optimization_dir, sampler, serializer, logger, pre_load_hooks
base_result_directory = optimization_dir / "results"

logger.debug("Sampling a new configuration")

for hook in pre_load_hooks:
# executes operations on the sampler before setting its state
# can be used for setting custom constraints on the optimizer state
# for example, can be used to input custom grid of configs, meta learning
# for example, can be used to input custom grid of configs, meta learning
# information for surrogate building, any non-stationary auxiliary information
sampler = hook(sampler)

sampler.load_results(previous_results, pending_configs)
config, config_id, previous_config_id = sampler.get_config_and_ids()

Expand Down Expand Up @@ -426,7 +426,8 @@ def run(
logger=None,
post_evaluation_hook=None,
overwrite_optimization_dir=False,
pre_load_hooks: List=[],
pre_load_hooks: List = [],
in_run_summary: bool = False,
):
serializer = YamlSerializer(sampler.load_config)
if logger is None:
Expand Down Expand Up @@ -551,7 +552,12 @@ def run(
# 3. Anything the user might want to do after the evaluation
if post_evaluation_hook is not None:
post_evaluation_hook(
config, config_id, pipeline_directory, result, logger
config,
config_id,
pipeline_directory,
result,
logger,
in_run_summary,
)
else:
logger.info(f"Finished evaluating config {config_id}")
Expand Down
57 changes: 37 additions & 20 deletions src/neps/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import warnings
from pathlib import Path
from typing import Callable, Literal, List
from typing import Callable, List, Literal

import ConfigSpace as CS

Expand All @@ -17,7 +17,7 @@
from .plot.tensorboard_eval import tblogger
from .search_spaces.parameter import Parameter
from .search_spaces.search_space import SearchSpace, pipeline_space_from_configspace
from .status.status import post_run_csv
from .status.status import in_run_csv, post_run_csv
from .utils.common import get_searcher_data
from .utils.result_utils import get_loss

Expand All @@ -31,6 +31,7 @@ def _post_evaluation_hook(
config_working_directory,
result,
logger,
in_run_summary,
loss_value_on_error=_loss_value_on_error,
ignore_errors=_ignore_errors,
):
Expand Down Expand Up @@ -90,15 +91,20 @@ def write_loss_and_config(file_handle, loss_, config_id_, config_):

tblogger.end_of_config()

if in_run_summary:
in_run_csv(working_directory, logger)

return _post_evaluation_hook


def run(
run_pipeline: Callable,
pipeline_space: dict[str, Parameter | CS.ConfigurationSpace] | CS.ConfigurationSpace,
pipeline_space: dict[str, Parameter | CS.ConfigurationSpace]
| CS.ConfigurationSpace,
root_directory: str | Path,
overwrite_working_directory: bool = False,
post_run_summary: bool = False,
in_run_summary: bool = False,
development_stage_id=None,
task_id=None,
max_evaluations_total: int | None = None,
Expand All @@ -108,7 +114,7 @@ def run(
ignore_errors: bool = False,
loss_value_on_error: None | float = None,
cost_value_on_error: None | float = None,
pre_load_hooks: List=[],
pre_load_hooks: List = [],
searcher: Literal[
"default",
"bayesian_optimization",
Expand Down Expand Up @@ -204,23 +210,23 @@ def run(
)
max_cost_total = searcher_kwargs["budget"]
del searcher_kwargs["budget"]

logger = logging.getLogger("neps")
logger.info(f"Starting neps.run using root directory {root_directory}")

if isinstance(searcher, BaseOptimizer):
searcher_instance = searcher
searcher_name = "custom"
searcher_alg = searcher.whoami()
user_defined_searcher = True
else:
(
(
searcher_name,
searcher_instance,
searcher_alg,
searcher_config,
searcher_info,
user_defined_searcher
searcher_instance,
searcher_alg,
searcher_config,
searcher_info,
user_defined_searcher,
) = _run_args(
pipeline_space=pipeline_space,
max_cost_total=max_cost_total,
Expand Down Expand Up @@ -277,7 +283,7 @@ def run(
searcher_info["searcher_args_user_modified"] = False
else:
raise ValueError(f"Unrecognized `searcher`. Not str or BaseOptimizer.")

metahyper.run(
run_pipeline,
searcher_instance,
Expand All @@ -294,14 +300,16 @@ def run(
),
overwrite_optimization_dir=overwrite_working_directory,
pre_load_hooks=pre_load_hooks,
in_run_summary=in_run_summary,
)

if post_run_csv:
if post_run_summary:
post_run_csv(root_directory, logger)


def _run_args(
pipeline_space: dict[str, Parameter | CS.ConfigurationSpace] | CS.ConfigurationSpace,
pipeline_space: dict[str, Parameter | CS.ConfigurationSpace]
| CS.ConfigurationSpace,
max_cost_total: int | float | None = None,
ignore_errors: bool = False,
loss_value_on_error: None | float = None,
Expand Down Expand Up @@ -335,7 +343,7 @@ def _run_args(
else:
new_pipeline_space[key] = value
pipeline_space = new_pipeline_space

# Transform to neps internal representation of the pipeline space
pipeline_space = SearchSpace(**pipeline_space)
except TypeError as e:
Expand Down Expand Up @@ -368,7 +376,9 @@ def _run_args(
config = get_searcher_data(searcher)

searcher_alg = config["searcher_init"]["algorithm"]
searcher_config = {} if config["searcher_kwargs"] is None else config["searcher_kwargs"]
searcher_config = (
{} if config["searcher_kwargs"] is None else config["searcher_kwargs"]
)

logger.info(f"Running {searcher} as the searcher")
logger.info(f"Algorithm: {searcher_alg}")
Expand Down Expand Up @@ -414,13 +424,20 @@ def _run_args(
"ignore_errors": ignore_errors,
}
)

searcher_instance = instance_from_map(
SearcherMapping, searcher_alg, "searcher", as_class=True
)(
pipeline_space=pipeline_space,
budget=max_cost_total, # TODO: use max_cost_total everywhere
**searcher_config,
)

return searcher, searcher_instance, searcher_alg, searcher_config, searcher_info, user_defined_searcher

return (
searcher,
searcher_instance,
searcher_alg,
searcher_config,
searcher_info,
user_defined_searcher,
)
54 changes: 52 additions & 2 deletions src/neps/status/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from metahyper import read
from metahyper._locker import Locker
from metahyper.api import ConfigResult
from metahyper.api import ConfigInRun, ConfigResult, _read_config_result
from metahyper.utils import YamlSerializer

from ..search_spaces.search_space import SearchSpace
from ..utils.result_utils import get_loss
Expand Down Expand Up @@ -228,7 +229,9 @@ def _get_dataframes_from_summary(
)

# Concatenate the two DataFrames
df_config_data = pd.concat([df_previous, df_pending], join="outer", ignore_index=True)
df_config_data = pd.concat(
[df_previous, df_pending], join="outer", ignore_index=True
)

# Create a dataframe with the specified additional summary data
additional_data = {
Expand Down Expand Up @@ -339,3 +342,50 @@ def post_run_csv(root_directory: str | Path, logger=None) -> None:
df_config_data,
df_run_data,
)


def in_run_csv(root_directory: str | Path, logger=None):
if logger is None:
logger = logging.getLogger("neps_status")

root_directory = Path(root_directory)
summary_csv_directory = Path(root_directory / "summary_csv")
summary_csv_directory.mkdir(parents=True, exist_ok=True)

csv_config_details = summary_csv_directory / "config_details.csv"

csv_lock_file = summary_csv_directory / ".csv_lock"
csv_lock_file.touch(exist_ok=True)
csv_locker = Locker(csv_lock_file, logger.getChild("_locker"))

serializer = YamlSerializer()

config_result = _read_config_result(ConfigInRun.pipeline_directory, serializer)

df = pd.DataFrame({"Config_id": [ConfigInRun.config_id]})
df = pd.concat(
[df, pd.json_normalize(config_result.config).add_prefix("config.")], axis=1
)
df = pd.concat(
[df, pd.json_normalize(config_result.metadata).add_prefix("metadata.")], axis=1
)
df = pd.concat(
[df, pd.json_normalize(config_result.result).add_prefix("result.")], axis=1
)

should_break = False
while not should_break:
try:
if csv_locker.acquire_lock():
# Check if the file already exists
file_exists = csv_config_details.is_file()

# Append data to CSV without header if the file exists
df.to_csv(
csv_config_details, index=False, header=not file_exists, mode="a"
)
except Exception as e:
raise e
finally:
csv_locker.release_lock()
should_break = True

0 comments on commit 3655898

Please sign in to comment.