From 0980d2c54cd39be35f01d71c87ad131148269edc Mon Sep 17 00:00:00 2001 From: pmannan Date: Mon, 25 Mar 2024 21:19:11 +0000 Subject: [PATCH 01/11] Adding parallelism strategies to benchmark_litgpt.py --- thunder/benchmarks/benchmark_litgpt.py | 52 ++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index 9120584989..7f0c852957 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -5,6 +5,7 @@ import functools from torch.utils.data import DataLoader, IterableDataset import torch.distributed as torch_dist +from torch.distributed.device_mesh import init_device_mesh import thunder from thunder.tests.lit_gpt_model import Config, GPT, Block @@ -16,6 +17,7 @@ world_size = int(os.environ.get("WORLD_SIZE", 1)) local_rank = int(os.environ.get("LOCAL_RANK", 0)) global_rank = int(os.environ.get("RANK", 0)) +nnodes = int(os.environ.get("NNODES", 1)) if world_size > 1: torch_dist.init_process_group(backend="nccl") pg = torch_dist.distributed_c10d._get_default_group() @@ -47,6 +49,8 @@ def __init__( shard_mode: str = "zero2", bucketing_mode: str = "none", sharding_size: int | None = None, + ddp_bucket_size: float = 256.0, + fsdp_bucket_params: float | None = None, n_layers: int | None = None, profiler_start: int = 15, profiler_stop: int = 15, @@ -74,7 +78,32 @@ def __init__( self.shard_mode = shard_mode self.bucketing_mode = bucketing_mode self.sharding_size = sharding_size + self.ddp_bucket_size = ddp_bucket_size + self.fsdp_bucket_params = fsdp_bucket_params self.micro_batch_size = micro_batch_size + + #Clarify benchmark assumptions + if self.sharding_size is not None: + assert "thunder" not in self.compile, \ + "Hybrid Sharding (FSDP/DP) using --sharding_size is not yet supported for Thunder. Coming soon." + + assert self.shard_mode == "hybrid_dp", \ + "Sharding Size is only used with Hybrid FSDP/DP style parallelism. Please " + + if self.bucketing_mode != "none" and self.distributed_mode != "fsdp": + print(f"[WARNING] --bucketing_mode {self.bucketing_mode} will be ignored as \ + it is only used for FSDP style parallelism but running {self.distributed_mode}") + + assert not "thunder" in self.compile and self.bucketing_mode == "size", \ + "'size' bucketing mode is not supported for Thunder. Please use 'none' or 'block'." + + if self.fsdp_bucket_params is not None: + if self.distributed_mode != "fsdp": + print(f"[WARNING] Found --fsdp_bucket_params but Distributed mode is {self.distributed_mode}. Will be ignnored") + + if self.bucketing_mode != "size": + print(f"[WARNING] Bucketing mode is set to {self.bucketing_mode}. --fsdp_bucket_params will be ignoted.") + if global_batch_size is not None: self.global_batch_size = global_batch_size else: @@ -153,7 +182,7 @@ def setup_distributed(self): model = ddp( self.model, broadcast_from=0, - bucket_size_in_mb=256.0, + bucket_size_in_mb=self.ddp_bucket_size, ) elif self.distributed_mode == "fsdp": from thunder.distributed import fsdp, FSDPType, FSDPBucketingStrategy @@ -173,26 +202,41 @@ def setup_distributed(self): model = torch.nn.parallel.DistributedDataParallel( self.model, device_ids=[local_rank], - bucket_cap_mb=256.0, + bucket_cap_mb=self.ddp_bucket_size, ) elif self.distributed_mode == "fsdp": from torch.distributed.fsdp import FullyShardedDataParallel as FSDP, ShardingStrategy - from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy + from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy, size_based_auto_wrap_policy + + mesh = None + if self.sharding_size is not None: + mesh = init_device_mesh("cuda", (nnodes, self.sharding_size)) litgpt_auto_wrap_policy = functools.partial(transformer_auto_wrap_policy, transformer_layer_cls={Block}) + size_auto_wrap_policy = functools.partial(size_based_auto_wrap_policy, min_num_params=self.fsdp_bucket_params) zero_bucket_wrap_policy = lambda module, recurse, nonwrapped_numel: nonwrapped_numel >= 0 + + custom_wrap_policy = { + "block": litgpt_auto_wrap_policy, + "size": size_auto_wrap_policy, + "none": zero_bucket_wrap_policy, + }[self.bucketing_mode] + sharding_strategy: ShardingStrategy = { "zero2": ShardingStrategy.SHARD_GRAD_OP, "zero3": ShardingStrategy.FULL_SHARD, + "hybrid_dp": ShardingStrategy.HYBRID_SHARD, }[self.shard_mode] + # AssertionError: Dynamo only supports FSDP with use_orig_params=True torch.cuda.set_device(local_rank) model = FSDP( self.model, sharding_strategy=sharding_strategy, - auto_wrap_policy=litgpt_auto_wrap_policy, + auto_wrap_policy=custom_wrap_policy, device_id=local_rank, use_orig_params=True, + device_mesh=mesh, ) return model From c6d4074f9c2b186a41f34e567c32ec4e06e0f146 Mon Sep 17 00:00:00 2001 From: pmannan Date: Mon, 25 Mar 2024 22:02:43 +0000 Subject: [PATCH 02/11] Fixes to Hybrid FSDP and assert --- thunder/benchmarks/benchmark_litgpt.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index 6bb4f8f563..be740678d2 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -17,7 +17,6 @@ world_size = int(os.environ.get("WORLD_SIZE", 1)) local_rank = int(os.environ.get("LOCAL_RANK", 0)) global_rank = int(os.environ.get("RANK", 0)) -nnodes = int(os.environ.get("NNODES", 1)) if world_size > 1: torch_dist.init_process_group(backend="nccl") pg = torch_dist.distributed_c10d._get_default_group() @@ -94,7 +93,7 @@ def __init__( print(f"[WARNING] --bucketing_mode {self.bucketing_mode} will be ignored as \ it is only used for FSDP style parallelism but running {self.distributed_mode}") - assert not "thunder" in self.compile and self.bucketing_mode == "size", \ + assert not ("thunder" in self.compile and self.bucketing_mode == "size"), \ "'size' bucketing mode is not supported for Thunder. Please use 'none' or 'block'." if self.fsdp_bucket_params is not None: @@ -210,7 +209,7 @@ def setup_distributed(self): mesh = None if self.sharding_size is not None: - mesh = init_device_mesh("cuda", (nnodes, self.sharding_size)) + mesh = init_device_mesh("cuda", (int(world_size/self.sharding_size), self.sharding_size)) litgpt_auto_wrap_policy = functools.partial(transformer_auto_wrap_policy, transformer_layer_cls={Block}) size_auto_wrap_policy = functools.partial(size_based_auto_wrap_policy, min_num_params=self.fsdp_bucket_params) From bc4dec9c41459b23202177b32d971fb0c0dbb13c Mon Sep 17 00:00:00 2001 From: pmannan Date: Mon, 25 Mar 2024 22:04:42 +0000 Subject: [PATCH 03/11] Add sharding_size assumption check --- thunder/benchmarks/benchmark_litgpt.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index be740678d2..d9bef5783b 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -89,6 +89,9 @@ def __init__( assert self.shard_mode == "hybrid_dp", \ "Sharding Size is only used with Hybrid FSDP/DP style parallelism. Please " + assert world_size % self.sharding_size == 0, \ + f"World size {world_size} is not divisible by Hybrid Sharding Size {self.sharding_size}" + if self.bucketing_mode != "none" and self.distributed_mode != "fsdp": print(f"[WARNING] --bucketing_mode {self.bucketing_mode} will be ignored as \ it is only used for FSDP style parallelism but running {self.distributed_mode}") From 32aaa911db3cfa41cdbaa64b5310e8ede3f657b4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 25 Mar 2024 22:08:56 +0000 Subject: [PATCH 04/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- thunder/benchmarks/benchmark_litgpt.py | 48 ++++++++++++++++---------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index d9bef5783b..e6b5db3413 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -49,7 +49,7 @@ def __init__( bucketing_mode: str = "none", sharding_size: int | None = None, ddp_bucket_size: float = 256.0, - fsdp_bucket_params: float | None = None, + fsdp_bucket_params: float | None = None, n_layers: int | None = None, profiler_start: int = 15, profiler_stop: int = 15, @@ -81,30 +81,40 @@ def __init__( self.fsdp_bucket_params = fsdp_bucket_params self.micro_batch_size = micro_batch_size - #Clarify benchmark assumptions + # Clarify benchmark assumptions if self.sharding_size is not None: - assert "thunder" not in self.compile, \ - "Hybrid Sharding (FSDP/DP) using --sharding_size is not yet supported for Thunder. Coming soon." + assert ( + "thunder" not in self.compile + ), "Hybrid Sharding (FSDP/DP) using --sharding_size is not yet supported for Thunder. Coming soon." - assert self.shard_mode == "hybrid_dp", \ - "Sharding Size is only used with Hybrid FSDP/DP style parallelism. Please " + assert ( + self.shard_mode == "hybrid_dp" + ), "Sharding Size is only used with Hybrid FSDP/DP style parallelism. Please " - assert world_size % self.sharding_size == 0, \ - f"World size {world_size} is not divisible by Hybrid Sharding Size {self.sharding_size}" + assert ( + world_size % self.sharding_size == 0 + ), f"World size {world_size} is not divisible by Hybrid Sharding Size {self.sharding_size}" if self.bucketing_mode != "none" and self.distributed_mode != "fsdp": - print(f"[WARNING] --bucketing_mode {self.bucketing_mode} will be ignored as \ - it is only used for FSDP style parallelism but running {self.distributed_mode}") + print( + f"[WARNING] --bucketing_mode {self.bucketing_mode} will be ignored as \ + it is only used for FSDP style parallelism but running {self.distributed_mode}" + ) - assert not ("thunder" in self.compile and self.bucketing_mode == "size"), \ - "'size' bucketing mode is not supported for Thunder. Please use 'none' or 'block'." + assert not ( + "thunder" in self.compile and self.bucketing_mode == "size" + ), "'size' bucketing mode is not supported for Thunder. Please use 'none' or 'block'." if self.fsdp_bucket_params is not None: if self.distributed_mode != "fsdp": - print(f"[WARNING] Found --fsdp_bucket_params but Distributed mode is {self.distributed_mode}. Will be ignnored") + print( + f"[WARNING] Found --fsdp_bucket_params but Distributed mode is {self.distributed_mode}. Will be ignnored" + ) if self.bucketing_mode != "size": - print(f"[WARNING] Bucketing mode is set to {self.bucketing_mode}. --fsdp_bucket_params will be ignoted.") + print( + f"[WARNING] Bucketing mode is set to {self.bucketing_mode}. --fsdp_bucket_params will be ignoted." + ) if global_batch_size is not None: self.global_batch_size = global_batch_size @@ -212,10 +222,12 @@ def setup_distributed(self): mesh = None if self.sharding_size is not None: - mesh = init_device_mesh("cuda", (int(world_size/self.sharding_size), self.sharding_size)) + mesh = init_device_mesh("cuda", (int(world_size / self.sharding_size), self.sharding_size)) litgpt_auto_wrap_policy = functools.partial(transformer_auto_wrap_policy, transformer_layer_cls={Block}) - size_auto_wrap_policy = functools.partial(size_based_auto_wrap_policy, min_num_params=self.fsdp_bucket_params) + size_auto_wrap_policy = functools.partial( + size_based_auto_wrap_policy, min_num_params=self.fsdp_bucket_params + ) zero_bucket_wrap_policy = lambda module, recurse, nonwrapped_numel: nonwrapped_numel >= 0 custom_wrap_policy = { @@ -223,13 +235,13 @@ def setup_distributed(self): "size": size_auto_wrap_policy, "none": zero_bucket_wrap_policy, }[self.bucketing_mode] - + sharding_strategy: ShardingStrategy = { "zero2": ShardingStrategy.SHARD_GRAD_OP, "zero3": ShardingStrategy.FULL_SHARD, "hybrid_dp": ShardingStrategy.HYBRID_SHARD, }[self.shard_mode] - + # AssertionError: Dynamo only supports FSDP with use_orig_params=True torch.cuda.set_device(local_rank) model = FSDP( From e152da324a1e863b50dbc6980d47c77714f03da5 Mon Sep 17 00:00:00 2001 From: pmannan Date: Mon, 25 Mar 2024 22:24:10 +0000 Subject: [PATCH 05/11] Print the additional configurations when displaying results --- thunder/benchmarks/benchmark_litgpt.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index d9bef5783b..ae73d89704 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -456,8 +456,16 @@ def benchmark_main(return_metrics_as_json=False, json_path="", **kwargs) -> None print(f"Distributed Mode: {benchmark.distributed_mode}") if benchmark.distributed_mode == "fsdp": print( - f"Sharding Mode: {benchmark.shard_mode}\nSharding Size: {benchmark.sharding_size}\nBucketing: {benchmark.bucketing_mode}" + f"Sharding Mode: {benchmark.shard_mode}\nBucketing: {benchmark.bucketing_mode}" ) + if benchmark.sharding_size is not None: + print( + f"Sharding Size: {benchmark.sharding_size}\nReplicate DP Groups: {int(world_size/benchmark.sharding_size)}" + ) + if benchmark.bucketing_mode == "size": + print(f"Bucketing Number Params: {self.fsdp_bucket_params}") + elif benchmark.distributed_mode == "ddp": + print(f"DDP Bucketing Size: {self.ddp_bucket_size} MB") print(f"Compiler: {benchmark.compile}") print(f"Average iter time: {benchmark.perf_metrics['average_iter_time']:.2f} ms") print(f"Memory used: {benchmark.perf_metrics['memory_used_GB']:.02f} GB") From ae5bb3e7062c21b3f4e39c3a7440dcc49d070c93 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 25 Mar 2024 22:25:12 +0000 Subject: [PATCH 06/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- thunder/benchmarks/benchmark_litgpt.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index 952b97ed92..da398f171f 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -467,9 +467,7 @@ def benchmark_main(return_metrics_as_json=False, json_path="", **kwargs) -> None ) print(f"Distributed Mode: {benchmark.distributed_mode}") if benchmark.distributed_mode == "fsdp": - print( - f"Sharding Mode: {benchmark.shard_mode}\nBucketing: {benchmark.bucketing_mode}" - ) + print(f"Sharding Mode: {benchmark.shard_mode}\nBucketing: {benchmark.bucketing_mode}") if benchmark.sharding_size is not None: print( f"Sharding Size: {benchmark.sharding_size}\nReplicate DP Groups: {int(world_size/benchmark.sharding_size)}" From b8b4aa9b15007e0e2d4b68fef0d541cbf569ed39 Mon Sep 17 00:00:00 2001 From: pmannan Date: Mon, 25 Mar 2024 23:43:43 +0000 Subject: [PATCH 07/11] Fix print mistake --- thunder/benchmarks/benchmark_litgpt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index 952b97ed92..b2d5e8c3b5 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -475,9 +475,9 @@ def benchmark_main(return_metrics_as_json=False, json_path="", **kwargs) -> None f"Sharding Size: {benchmark.sharding_size}\nReplicate DP Groups: {int(world_size/benchmark.sharding_size)}" ) if benchmark.bucketing_mode == "size": - print(f"Bucketing Number Params: {self.fsdp_bucket_params}") + print(f"Bucketing Number Params: {benchmark.fsdp_bucket_params}") elif benchmark.distributed_mode == "ddp": - print(f"DDP Bucketing Size: {self.ddp_bucket_size} MB") + print(f"DDP Bucketing Size: {benchmark.ddp_bucket_size} MB") print(f"Compiler: {benchmark.compile}") print(f"Average iter time: {benchmark.perf_metrics['average_iter_time']:.2f} ms") print(f"Memory used: {benchmark.perf_metrics['memory_used_GB']:.02f} GB") From f303ec8aeb517b132a877544147a9734dc2ee29d Mon Sep 17 00:00:00 2001 From: pmannan Date: Tue, 26 Mar 2024 23:37:21 +0000 Subject: [PATCH 08/11] Add support for _HYBRID_SHARD_ZERO2 for Eager/Torch.Compile --- thunder/benchmarks/benchmark_litgpt.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index 514d09b63a..6c0e48943f 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -88,7 +88,7 @@ def __init__( ), "Hybrid Sharding (FSDP/DP) using --sharding_size is not yet supported for Thunder. Coming soon." assert ( - self.shard_mode == "hybrid_dp" + self.shard_mode in ["hybrid_zero2", "hybrid_zero3"] ), "Sharding Size is only used with Hybrid FSDP/DP style parallelism. Please " assert ( @@ -239,7 +239,8 @@ def setup_distributed(self): sharding_strategy: ShardingStrategy = { "zero2": ShardingStrategy.SHARD_GRAD_OP, "zero3": ShardingStrategy.FULL_SHARD, - "hybrid_dp": ShardingStrategy.HYBRID_SHARD, + "hybrid_zero2": ShardingStrategy._HYBRID_SHARD_ZERO2, + "hybrid_zero3": ShardingStrategy.HYBRID_SHARD, }[self.shard_mode] # AssertionError: Dynamo only supports FSDP with use_orig_params=True From 7bd677a1883feb5a5eed7b02bbf5dbd89bd85fb9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 26 Mar 2024 23:40:05 +0000 Subject: [PATCH 09/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- thunder/benchmarks/benchmark_litgpt.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index 6c0e48943f..31bfb704ae 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -87,9 +87,10 @@ def __init__( "thunder" not in self.compile ), "Hybrid Sharding (FSDP/DP) using --sharding_size is not yet supported for Thunder. Coming soon." - assert ( - self.shard_mode in ["hybrid_zero2", "hybrid_zero3"] - ), "Sharding Size is only used with Hybrid FSDP/DP style parallelism. Please " + assert self.shard_mode in [ + "hybrid_zero2", + "hybrid_zero3", + ], "Sharding Size is only used with Hybrid FSDP/DP style parallelism. Please " assert ( world_size % self.sharding_size == 0 From c3d013276fd6c9663cc587547be527c78fd967d1 Mon Sep 17 00:00:00 2001 From: parthmannan <38387286+parthmannan@users.noreply.github.com> Date: Wed, 27 Mar 2024 12:49:59 -0700 Subject: [PATCH 10/11] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Carlos MocholĂ­ --- thunder/benchmarks/benchmark_litgpt.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index 31bfb704ae..5a568491b2 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -94,7 +94,7 @@ def __init__( assert ( world_size % self.sharding_size == 0 - ), f"World size {world_size} is not divisible by Hybrid Sharding Size {self.sharding_size}" + ), f"World size {world_size} is not divisible by the sharding size {self.sharding_size}" if self.bucketing_mode != "none" and self.distributed_mode != "fsdp": print( @@ -109,12 +109,12 @@ def __init__( if self.fsdp_bucket_params is not None: if self.distributed_mode != "fsdp": print( - f"[WARNING] Found --fsdp_bucket_params but Distributed mode is {self.distributed_mode}. Will be ignnored" + f"[WARNING] Found --fsdp_bucket_params but Distributed mode is {self.distributed_mode}. Will be ignored" ) if self.bucketing_mode != "size": print( - f"[WARNING] Bucketing mode is set to {self.bucketing_mode}. --fsdp_bucket_params will be ignoted." + f"[WARNING] Bucketing mode is set to {self.bucketing_mode}. --fsdp_bucket_params will be ignored." ) if global_batch_size is not None: From bf03ae9326e353f91447fa670dadbb7bf8671e2e Mon Sep 17 00:00:00 2001 From: Parth Mannan Date: Wed, 27 Mar 2024 12:52:04 -0700 Subject: [PATCH 11/11] Fix unfinished sentence --- thunder/benchmarks/benchmark_litgpt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thunder/benchmarks/benchmark_litgpt.py b/thunder/benchmarks/benchmark_litgpt.py index 6c0e48943f..973de04e6f 100644 --- a/thunder/benchmarks/benchmark_litgpt.py +++ b/thunder/benchmarks/benchmark_litgpt.py @@ -89,7 +89,7 @@ def __init__( assert ( self.shard_mode in ["hybrid_zero2", "hybrid_zero3"] - ), "Sharding Size is only used with Hybrid FSDP/DP style parallelism. Please " + ), "Sharding Size is only used with Hybrid FSDP/DP style parallelism." assert ( world_size % self.sharding_size == 0