Skip to content

Commit

Permalink
Added dispatch_apply and error to Parameters creation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhishek Divekar committed Sep 26, 2024
1 parent c1442cf commit 10ee277
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/synthesizrr/base/framework/trainer/RayTuneTrainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,7 @@ def get_final_metrics_stats(
continue
final_dataset_metrics[dataset_metric.display_name]: Dict[str, Union[int, float, Dict]] = {
'mean': np.mean(final_dataset_metrics[dataset_metric.display_name]),
'median': np.median(final_dataset_metrics[dataset_metric.display_name]),
'std': np.std(final_dataset_metrics[dataset_metric.display_name], ddof=1), ## Unbiased
'min': np.min(final_dataset_metrics[dataset_metric.display_name]),
'max': np.max(final_dataset_metrics[dataset_metric.display_name]),
Expand Down
118 changes: 114 additions & 4 deletions src/synthesizrr/base/util/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import ray
from ray.exceptions import GetTimeoutError
from ray.util.dask import RayDaskCallback
from pydantic import validate_arguments, conint, confloat
from synthesizrr.base.util.language import ProgressBar, set_param_from_alias, type_str, get_default, first_item, Parameters
from pydantic import conint, confloat
from synthesizrr.base.util.language import ProgressBar, set_param_from_alias, type_str, get_default, first_item, Parameters, \
is_list_or_set_like, is_dict_like, PandasSeries, filter_kwargs
from synthesizrr.base.constants.DataProcessingConstants import Parallelize, FailureAction, Status, COMPLETED_STATUSES

from functools import partial
## Jupyter-compatible asyncio usage:
import asyncio
Expand Down Expand Up @@ -445,6 +445,117 @@ def dispatch_executor(
return None


def dispatch_apply(
struct: Union[List, Tuple, np.ndarray, PandasSeries, Set, frozenset, Dict],
*args,
fn: Callable,
parallelize: Parallelize,
forward_parallelize: bool = False,
item_wait: Optional[float] = None,
iter_wait: Optional[float] = None,
iter: bool = False,
**kwargs
) -> Any:
parallelize: Parallelize = Parallelize.from_str(parallelize)
item_wait: float = get_default(
item_wait,
{
Parallelize.ray: _RAY_ACCUMULATE_ITEM_WAIT,
Parallelize.processes: _LOCAL_ACCUMULATE_ITEM_WAIT,
Parallelize.threads: _LOCAL_ACCUMULATE_ITEM_WAIT,
Parallelize.asyncio: 0.0,
Parallelize.sync: 0.0,
}[parallelize]
)
iter_wait: float = get_default(
iter_wait,
{
Parallelize.ray: _RAY_ACCUMULATE_ITER_WAIT,
Parallelize.processes: _LOCAL_ACCUMULATE_ITER_WAIT,
Parallelize.threads: _LOCAL_ACCUMULATE_ITER_WAIT,
Parallelize.asyncio: 0.0,
Parallelize.sync: 0.0,
}[parallelize]
)
if forward_parallelize:
kwargs['parallelize'] = parallelize
executor: Optional = dispatch_executor(
parallelize=parallelize,
**kwargs,
)
try:
set_param_from_alias(kwargs, param='progress_bar', alias=['progress', 'pbar'], default=True)
progress_bar: Union[ProgressBar, Dict, bool] = kwargs.pop('progress_bar', False)
submit_pbar: ProgressBar = ProgressBar.of(
progress_bar,
total=len(struct),
desc='Submitting',
prefer_kwargs=False,
unit='item',
)
collect_pbar: ProgressBar = ProgressBar.of(
progress_bar,
total=len(struct),
desc='Collecting',
prefer_kwargs=False,
unit='item',
)
if is_list_or_set_like(struct):
futs = []
for v in struct:
def submit_task(item, **dispatch_kwargs):
return fn(item, **dispatch_kwargs)

futs.append(
dispatch(
fn=submit_task,
item=v,
parallelize=parallelize,
executor=executor,
delay=item_wait,
**filter_kwargs(fn, **kwargs),
)
)
submit_pbar.update(1)
elif is_dict_like(struct):
futs = {}
for k, v in struct.items():
def submit_task(item, **dispatch_kwargs):
return fn(item, **dispatch_kwargs)

futs[k] = dispatch(
fn=submit_task,
key=k,
item=v,
parallelize=parallelize,
executor=executor,
delay=item_wait,
**filter_kwargs(fn, **kwargs),
)
submit_pbar.update(1)
else:
raise NotImplementedError(f'Unsupported type: {type_str(struct)}')
submit_pbar.success()
if iter:
return accumulate_iter(
futs,
item_wait=item_wait,
iter_wait=iter_wait,
progress_bar=collect_pbar,
**kwargs
)
else:
return accumulate(
futs,
item_wait=item_wait,
iter_wait=iter_wait,
progress_bar=collect_pbar,
**kwargs
)
finally:
stop_executor(executor)


def get_result(
x,
*,
Expand Down Expand Up @@ -785,7 +896,6 @@ def wait(
wait_if_future(futures)


@validate_arguments
def retry(
fn,
*args,
Expand Down
64 changes: 57 additions & 7 deletions src/synthesizrr/base/util/language.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,19 @@ def _create_param(p: inspect.Parameter) -> inspect.Parameter:
wrapper.__signature__ = sig
wrapper.__annotations__ = {f"{n}_" if n in names_to_fix else n: v for n, v in f.__annotations__.items()}

return validate_arguments(
wrapper,
config={
"allow_population_by_field_name": True,
"arbitrary_types_allowed": True,
}
)
try:
return validate_arguments(
wrapper,
config={
"allow_population_by_field_name": True,
"arbitrary_types_allowed": True,
}
)
except Exception as e:
raise ValueError(
f'Error creating model for function {get_fn_spec(f).resolved_name}.'
f'\nEncountered Exception: {format_exception_msg(e)}'
)


def not_impl(
Expand Down Expand Up @@ -1531,6 +1537,27 @@ def invert_dict(d: Dict) -> Dict:
return d_inv


def iter_dict(d, depth: int = 1, *, _cur_depth: int = 0):
"""
Recursively iterate over nested dictionaries and yield keys at each depth.
:param d: The dictionary to iterate over.
:param depth: The current depth of recursion (used for tracking depth of keys).
:return: Yields tuples where the first elements are keys at different depths, and the last element is the value.
"""
assert isinstance(d, dict), f'Input must be a dictionary, found: {type(d)}'
assert isinstance(depth, int) and depth >= 1, f'depth must be an integer (1 or more)'

for k, v in d.items():
if isinstance(v, dict) and _cur_depth < depth - 1:
# If the value is a dictionary, recurse
for subkeys in iter_dict(v, _cur_depth=_cur_depth + 1, depth=depth):
yield (k,) + subkeys
else:
# If the value is not a dictionary, yield the key-value pair
yield (k, v)


## ======================== NumPy utils ======================== ##
def is_numpy_integer_array(data: Any) -> bool:
if not isinstance(data, np.ndarray):
Expand Down Expand Up @@ -2625,6 +2652,15 @@ class Parameters(BaseModel, ABC):
aliases: ClassVar[Tuple[str, ...]] = tuple()
dict_exclude: ClassVar[Tuple[str, ...]] = tuple()

def __init__(self, *args, **kwargs):
try:
super().__init__(*args, **kwargs)
except Exception as e:
raise ValueError(
f'Cannot create Pydantic instance of type "{self.class_name}".'
f'\nEncountered exception: {format_exception_msg(e)}'
)

@classproperty
def class_name(cls) -> str:
return str(cls.__name__) ## Will return the child class name.
Expand Down Expand Up @@ -3227,6 +3263,15 @@ def create_progress_bar(
smoothing=smoothing,
**kwargs
)
elif style == 'ray':
from ray.experimental import tqdm_ray
kwargs = filter_keys(
kwargs,
keys=set(get_fn_spec(tqdm_ray.tqdm).args + get_fn_spec(tqdm_ray.tqdm).kwargs),
how='include',
)
from ray.experimental import tqdm_ray
return tqdm_ray.tqdm(**kwargs)
else:
return StdTqdmProgressBar(
ncols=ncols,
Expand Down Expand Up @@ -3311,6 +3356,11 @@ def ignore_all_output():
yield


@contextmanager
def ignore_nothing():
yield


# from pydantic import Field, AliasChoices
# def Alias(*, default: Optional[Any] = None, alias: Union[Tuple[str, ...], List[str], Set[str], str]):
# alias: AliasChoices = AliasChoices(*as_tuple(alias))
Expand Down

0 comments on commit 10ee277

Please sign in to comment.