-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable saving hyperopt checkpoints with multi-node clusters #2386
Enable saving hyperopt checkpoints with multi-node clusters #2386
Conversation
Unit Test Results6 files + 1 6 suites +1 56s ⏱️ - 3h 17m 59s For more details on these errors, see this check. Results for commit c67aaf9. ± Comparison against base commit 05ece0c. ♻️ This comment has been updated with latest results. |
ludwig/utils/fs_utils.py
Outdated
if not storage_options: | ||
logger.info(f"Using default storage options for `{protocol}` filesystem.") | ||
if protocol == S3: | ||
s3 = S3RemoteStorageOptions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is s3 handled specially here? Do we need to do anything for other filesystems?
One way to make this clearer could be:
if not storage_options:
logger.info(f"Using default storage options for `{protocol}` filesystem.")
if protocol == S3:
storage_options = S3RemoteStorageOptions().get_storage_options()
try:
return fsspec.filesystem(protocol, **storage_options)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up on my comment below, I now believe that this storage_options
handling shouldn't be necessary.
Also, this approach has an additional bug, in that it will break existing use_credentials
behavior by overriding it with credentials from the environment (when both are using s3).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgaddair S3 is handled especially here because users may want to pass in a different S3 compatible storage like Minio. The issue is that to do this, users need to pass in the --endpoint_url
flag to AWSCLI because AWSCLI currently doesn't have an environment variable that can be configured to pick up this URL automatically (although they are currently working on a proposal to add it in: aws/aws-sdk#229). Within fsspec, that would involve manually passing in client_kwargs with the endpoint_url defined in the object. I think this is the only reason to do something special for S3 at the moment.
I can't entirely say what would be required for other file systems yet, but those should automatically get created via fsspec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the use_credentials
method - I think that this actually won't be an issue because both use_credentials
and this approach will end up using the same key and secret since they're derived from the same environment variables. The only thing that changes is the endpoint URL, which in either case would need to change for S3 compatible storage since use_credentials would also require this new endpoint URL (if it is specified).
"""Get credentials from environment variables.""" | ||
|
||
def __init__(self): | ||
super().__init__(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, MLFLOW_S3_ENDPOINT_URL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't make sense to me to have mlflow coupling here. Looks like the idea here is to pull storage options from certain environment variables when they're not passed in explicitly. However, this shouldn't be necessary, as fsspec already has a mechanism for plumbing credentials through the environment, either through the use_credentials
helper that sets the conf
dictionary, or through the FSSPEC_
env vars:
https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration
So I would rely on the built-in fsspec plumbing mechanisms instead of recreating it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgaddair the MLFLOW_S3_ENDPOINT_URL
is actually just a placeholder name and can be change to any other name. It doesn't actually end up involving any coupling with MLFLOW. I'll rename this environment variable to something more generic like S3_ENDPOINT_URL to make this more clear!
I like the idea of using the FSSPEC_ env vars as a potential substitute to avoid plumbing S3 credentials in this particular case with endpoint URLs. Will update and clear this up so that we don't need to do any of this manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although following from the thread here (fsspec/s3fs#432) where you've requested this before as well, it doesn't seem like there's a neat way to specify the endpoint URL since it needs to be set inside a nested dictionary. How do you feel about just using this as an environment variable for now but with a different name?
ludwig/hyperopt/execution.py
Outdated
def _get_tmp_remote_checkpoint_dir(self, trial_dir: Path) -> Optional[Union[str, Tuple[str, str]]]: | ||
"""Get the path to remote checkpoint directory.""" | ||
if self.sync_config is None: | ||
return None | ||
|
||
if self.sync_config.upload_dir is not None: | ||
# Cloud storage sync config | ||
remote_checkpoint_dir = os.path.join( | ||
self.sync_config.upload_dir, "tmp", *_get_relative_checkpoints_dir_parts(trial_dir) | ||
) | ||
return remote_checkpoint_dir | ||
elif self.kubernetes_namespace is not None: | ||
# Kubernetes sync config. Returns driver node name and path. | ||
# When running on kubernetes, each trial is rsynced to the node running the main process. | ||
node_name = self._get_kubernetes_node_address_by_ip()(self.head_node_ip) | ||
return (node_name, trial_dir) | ||
else: | ||
logger.warning( | ||
"Checkpoint syncing disabled as it is only supported to remote cloud storage or on Kubernetes " | ||
"clusters. To use syncing, set the kubernetes_namespace in the config or use a cloud URI " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Merge into
_get_remote_checkpoint_dir
with optional argument "tmp" to avoid code duplication
d798735
to
c26d97b
Compare
for more information, see https://pre-commit.ci
…c-so-that-the-correct-aws-credentials-are-picked-up
Closing this in favor of #2617 |
This PR enables two things:
SyncClient
and pass it into RayTune for custom checkpointing syncing behavior.fsspec
related functions infs_utils
can now take in custom storage options (credentials) for different protocol types. This means that model metadata, parameters, checkpoints etc can now be written to remote storage options such as GCS, Azure, AWS, Minio, etc - anything thatfsspec
supports.Both of these changes together enable being able to perform multi-node checkpoint syncing for hyperopt.