Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 52 additions & 51 deletions test/dtypes/test_nf4.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
to_nf4,
)
from torchao.testing.utils import skip_if_rocm
from torchao.utils import torch_version_at_least
from torchao.utils import get_current_accelerator_device, torch_version_at_least

bnb_available = False

Expand All @@ -57,6 +57,7 @@
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
_DEVICE = get_current_accelerator_device()


def _build_input_weight(embed_dim: int, device: torch.device, dtype: torch.dtype):
Expand All @@ -68,7 +69,7 @@ def _build_input_weight(embed_dim: int, device: torch.device, dtype: torch.dtype

def _build_bnb_linear(input_weight, device):
assert bnb_available, "Needs bitsandbytes support"
param = bnb.nn.Params4bit(input_weight, requires_grad=False, quant_type="nf4").cuda(
param = bnb.nn.Params4bit(input_weight, requires_grad=False, quant_type="nf4").to(
device
)
bnb_linear = bnb.nn.LinearNF4(
Expand Down Expand Up @@ -121,7 +122,7 @@ def test_backward_dtype_match(self, dtype: torch.dtype):
assert nf4_tensor.grad is None

@unittest.skipIf(not bnb_available, "Need bnb availble")
@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
@unittest.skipIf(
torch_version_at_least("2.7.0"), reason="Failing in CI"
) # TODO: fix this
Expand All @@ -130,7 +131,7 @@ def test_backward_dtype_match(self, dtype: torch.dtype):
def test_reconstruction_qlora_vs_bnb(self, dtype: torch.dtype):
# From https://github.com/drisspg/transformer_nuggets/blob/f05afad68ad9086d342268f46a7f344617a02314/test/test_qlora.py#L65C1-L81C47
torch.manual_seed(0)
device = "cuda"
device = _DEVICE
embed_dim = 512
input_weight = _build_input_weight(embed_dim, device, dtype)
nf4_weight = to_nf4(input_weight)
Expand All @@ -147,7 +148,7 @@ def test_reconstruction_qlora_vs_bnb(self, dtype: torch.dtype):
assert (nugs_diff - bnb_diff).abs() < 2e-1

@unittest.skipIf(not bnb_available, "Need bnb availble")
@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
@skip_if_rocm("ROCm enablement in progress")
@unittest.skipIf(
torch_version_at_least("2.7.0"), reason="Failing in CI"
Expand All @@ -160,12 +161,12 @@ def test_nf4_bnb_linear(self, dtype: torch.dtype):
"""
torch.manual_seed(0)
dim = 512
device = "cuda"
device = _DEVICE
input_weight = _build_input_weight(dim, device, dtype)
nf4_weight = to_nf4(input_weight)
bnb_linear = _build_bnb_linear(input_weight, device)

inp = torch.randn(2, 512, dtype=dtype, device="cuda")
inp = torch.randn(2, 512, dtype=dtype, device=_DEVICE)

out_nf4 = linear_nf4(inp, nf4_weight).sum()
out_bnb = bnb_linear(inp).sum()
Expand All @@ -176,11 +177,11 @@ def test_nf4_bnb_linear(self, dtype: torch.dtype):
assert err_native < 0.5 * dim
assert err_bnb < 0.5 * dim

@unittest.skipIf(not torch.cuda.is_available(), "Need cuda for test")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU for test")
@parametrize("dtype", [torch.bfloat16, torch.float16, torch.float32])
def test_load_from_state_dicts(self, dtype: torch.dtype):
"""Tests loading to and from different module state dicts"""
input_tensor = torch.rand(64, device="cuda", dtype=dtype)
input_tensor = torch.rand(64, device=_DEVICE, dtype=dtype)
base_mod = self.TestMod(input_tensor, 32, 2)

dummy_dict = {"param": input_tensor}
Expand Down Expand Up @@ -222,27 +223,27 @@ def test_to_copy(self, dtype: torch.dtype):
nf4_to_dtype = input_tensor_nf4.to(dtype)
torch.testing.assert_allclose(input_tensor, nf4_to_dtype, atol=0.13, rtol=0.13)

if torch.cuda.is_available():
input_tensor = torch.rand(128, device="cuda")
if torch.accelerator.is_available():
input_tensor = torch.rand(128, device=_DEVICE)
input_tensor_nf4 = to_nf4(input_tensor, 32, 2)
nf4_to_dtype = input_tensor_nf4.to(dtype)
torch.testing.assert_allclose(
input_tensor, nf4_to_dtype, atol=0.13, rtol=0.13
)

@unittest.skipIf(not torch.cuda.is_available(), "Need cuda for test")
@unittest.skipIf(not torch.accelerator.is_available(), "Need gpu for test")
def test_to_copy_device(self):
input_tensor = torch.rand(128, device="cpu")
t = to_nf4(input_tensor, 32, 2)
assert t.device == torch.device("cpu")
z = t.cuda()
assert z.device.type == "cuda" # Because the device could be cuda:0
z = t.to(_DEVICE)
assert z.device.type == _DEVICE.type # Because the device could be cuda:0
x = z.cpu()
assert x.device == torch.device("cpu")

input_tensor = torch.rand(128, device="cuda")
input_tensor = torch.rand(128, device=_DEVICE)
t = to_nf4(input_tensor, 32, 2)
assert t.device.type == "cuda"
assert t.device.type == _DEVICE.type

@parametrize("dtype", [torch.bfloat16, torch.float16, torch.float32])
def test_to_dtype(self, dtype: torch.dtype):
Expand All @@ -252,10 +253,10 @@ def test_to_dtype(self, dtype: torch.dtype):
assert type(input_tensor_nf4.to(dtype)) is torch.Tensor
assert input_tensor_nf4.to(dtype).dtype is dtype

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
@parametrize("dtype", [torch.bfloat16, torch.float16, torch.float32])
def test_smoketest_linear(self, dtype: torch.dtype):
a = torch.randn(32, 32, dtype=dtype, device="cuda")
a = torch.randn(32, 32, dtype=dtype, device=_DEVICE)
a_nf4 = torchao.dtypes.to_nf4(a, 16, 2)
inp = torch.randn(2, 32, 32, dtype=a.dtype, device=a.device)
_ = torch.nn.functional.linear(inp, a)
Expand All @@ -272,37 +273,37 @@ def test_smoketest_linear_compile(self, dtype: torch.dtype):
self.skipTest("test requires SM capability of at least (8, 0).")
if version.parse(torch.__version__) < version.parse("2.3.0"):
self.skipTest("test requires 2.3.0 and above for tracing NF4Tensor")
a = torch.randn(32, 32, dtype=dtype, device="cuda")
a = torch.randn(32, 32, dtype=dtype, device=_DEVICE)
a_nf4 = torchao.dtypes.to_nf4(a, 16, 2)
inp = torch.randn(2, 32, 32, dtype=a.dtype, device=a.device)
_ = torch.compile(torch.nn.functional.linear, mode="max-autotune")(inp, a_nf4)

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
@parametrize("dtype", [torch.bfloat16, torch.float16, torch.float32])
@parametrize("shape", [(16, 16), (32, 16)])
@parametrize("chunk_size", [8, 16, 32])
def test_chunk_size_equivalence(self, dtype: torch.dtype, shape, chunk_size):
a = torch.randn(shape, device="cuda", dtype=dtype)
a = torch.randn(shape, device=_DEVICE, dtype=dtype)
with unittest.mock.patch("torchao.dtypes.nf4tensor.CHUNK_SIZE", chunk_size):
nf4_patched = to_nf4(a, 16, 2)
# This will be essentially no chunking since the numel is alot smaller than default chunk_size
nf4_base = to_nf4(a, 16, 2)

torch.testing.assert_close(nf4_patched.quantized_data, nf4_base.quantized_data)

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
@parametrize("input_size", [(512 * 512,), (512, 512)])
def test_empty_like(self, input_size: Union[Tuple[int], int]):
nf4_tensor = to_nf4(torch.rand(input_size, device="cuda"))
nf4_tensor = to_nf4(torch.rand(input_size, device=_DEVICE))
new_tensor = torch.empty_like(nf4_tensor, device="cpu")
self.assertTrue(isinstance(new_tensor, NF4Tensor))
self.assertEqual(new_tensor.get_device(), -1) # that it's on CPU
self.assertEqual(new_tensor.size(), nf4_tensor.size())

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
@parametrize("compile", [False, True])
def test_quantize_api(self, compile):
nf4_linear = nn.Linear(512, 512, device="cuda")
nf4_linear = nn.Linear(512, 512, device=_DEVICE)
torchao.quantize_(nf4_linear, nf4_weight_only())
assert isinstance(nf4_linear.weight, NF4Tensor)

Expand All @@ -313,14 +314,14 @@ def test_quantize_api(self, compile):
nf4_linear.compile()
ref_linear.compile()

nf4_x = torch.randn(2, 512, device="cuda").requires_grad_()
nf4_x = torch.randn(2, 512, device=_DEVICE).requires_grad_()
ref_x = nf4_x.detach().clone().requires_grad_()

nf4_out = nf4_linear(nf4_x)
ref_out = ref_linear(ref_x)
self.assertEqual(nf4_out, ref_out)

grad_out = torch.randn(2, 512, device="cuda")
grad_out = torch.randn(2, 512, device=_DEVICE)
nf4_out.backward(grad_out)
ref_out.backward(grad_out)
self.assertEqual(nf4_x.grad, ref_x.grad)
Expand Down Expand Up @@ -511,60 +512,60 @@ def test_tensor_as_strided_invalid(self, input_size: Union[Tuple[int], int]):
nf4_tensor, nf4_tensor.size(), stride, nf4_tensor.storage_offset()
)

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPu available")
def test_pin_memory(self):
nf4_tensor = to_nf4(torch.randn(512 * 512))
self.assertFalse(nf4_tensor.is_pinned())

nf4_tensor = nf4_tensor.pin_memory()
self.assertTrue(nf4_tensor.is_pinned())

nf4_tensor = to_nf4(torch.randn(512 * 512, device="cuda"))
nf4_tensor = to_nf4(torch.randn(512 * 512, device=_DEVICE))
self.assertFalse(nf4_tensor.is_pinned())

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
def test_to_cuda(self):
nf4_tensor = to_nf4(torch.randn(512 * 512))
self.assertEqual(nf4_tensor.device.type, "cpu")
nf4_tensor = nf4_tensor.to("cuda", non_blocking=True)
self.assertEqual(nf4_tensor.device.type, "cuda")
nf4_tensor = nf4_tensor.to(_DEVICE, non_blocking=True)
self.assertEqual(nf4_tensor.device.type, _DEVICE.type)
self.assertEqual(type(nf4_tensor), NF4Tensor)
nf4_tensor.get_original_weight() # make sure we can dequantize

nf4_tensor = to_nf4(torch.randn(512 * 512))
self.assertEqual(nf4_tensor.device.type, "cpu")
nf4_tensor = nf4_tensor.to("cuda")
self.assertEqual(nf4_tensor.device.type, "cuda")
nf4_tensor = nf4_tensor.to(_DEVICE)
self.assertEqual(nf4_tensor.device.type, _DEVICE.type)
self.assertEqual(type(nf4_tensor), NF4Tensor)
nf4_tensor.get_original_weight()

nf4_tensor = to_nf4(torch.randn(512 * 512))
self.assertEqual(nf4_tensor.device.type, "cpu")
nf4_tensor = nf4_tensor.to("cuda", torch.bfloat16)
self.assertEqual(nf4_tensor.device.type, "cuda")
nf4_tensor = nf4_tensor.to(_DEVICE, torch.bfloat16)
self.assertEqual(nf4_tensor.device.type, _DEVICE.type)
self.assertEqual(nf4_tensor.dtype, torch.bfloat16)
self.assertEqual(type(nf4_tensor), torch.Tensor) # dequantized

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
def test_to_cpu(self):
nf4_tensor = to_nf4(torch.randn(512 * 512, device="cuda"))
nf4_tensor = to_nf4(torch.randn(512 * 512, device=_DEVICE))
nf4_tensor = nf4_tensor.cpu()
self.assertEqual(nf4_tensor.device.type, "cpu")
for attr in _INNER_TENSOR_NAMES_FOR_SHARDING:
inner_tensor = getattr(nf4_tensor, attr)
self.assertEqual(inner_tensor.device.type, "cpu")
nf4_tensor.get_original_weight() # make sure we can dequantize

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
def test_to_module(self):
linear = nn.Linear(512, 512, bias=False)
linear.weight = nn.Parameter(
to_nf4(linear.weight.detach()), requires_grad=False
)
linear.cuda()
self.assertEqual(linear.weight.device.type, "cuda")
linear.to(_DEVICE)
self.assertEqual(linear.weight.device.type, _DEVICE.type)
weight = linear.weight.get_original_weight()
self.assertEqual(weight.device.type, "cuda")
self.assertEqual(weight.device.type, _DEVICE.type)

linear.cpu()
self.assertEqual(linear.weight.device.type, "cpu")
Expand All @@ -575,20 +576,20 @@ def test_to_module(self):
linear.weight = nn.Parameter(
to_nf4(linear.weight.detach()), requires_grad=False
)
linear.to("cuda")
self.assertEqual(linear.weight.device.type, "cuda")
linear.to(_DEVICE)
self.assertEqual(linear.weight.device.type, _DEVICE.type)
weight = linear.weight.get_original_weight()
self.assertEqual(weight.device.type, "cuda")
self.assertEqual(weight.device.type, _DEVICE.type)

linear.to("cpu")
self.assertEqual(linear.weight.device.type, "cpu")
weight = linear.weight.get_original_weight()
self.assertEqual(weight.device.type, "cpu")

@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
@parametrize("input_size", [512 * 512, (512 * 512,), (512, 512)])
def test_tensor_deepcopy(self, input_size: Union[Tuple[int], int]):
nf4_orig = to_nf4(torch.randn(input_size, device="cuda"))
nf4_orig = to_nf4(torch.randn(input_size, device=_DEVICE))
nf4_clone = copy.deepcopy(nf4_orig)
self.assertEqual(
nf4_clone.get_original_weight(), nf4_orig.get_original_weight()
Expand Down Expand Up @@ -678,7 +679,7 @@ def _test_qlora_fsdp2(
dropout_p=0,
)
torch.manual_seed(42)
with torch.device("cuda"):
with torch.device(_DEVICE):
base_model = Transformer(model_args)
for layer in base_model.layers:
# attention with lora adapters
Expand Down Expand Up @@ -732,7 +733,7 @@ def _test_qlora_fsdp2(

torch.manual_seed(42 + self.rank + 1)
for iter_idx in range(5):
inp = torch.randint(0, vocab_size, (batch_size, seq_len), device="cuda")
inp = torch.randint(0, vocab_size, (batch_size, seq_len), device=_DEVICE)
fsdp_optim.zero_grad(set_to_none=(iter_idx % 2 == 0))
fsdp_loss = fsdp_model(inp).sum()
fsdp_loss.backward()
Expand All @@ -756,7 +757,7 @@ def world_size(self) -> int:
return 2

@skip_if_lt_x_gpu(2)
@unittest.skipIf(not torch.cuda.is_available(), "Need CUDA available")
@unittest.skipIf(not torch.accelerator.is_available(), "Need GPU available")
def test_comm(self):
self.run_subtests(
{"input_size": [512, 2048]},
Expand All @@ -767,7 +768,7 @@ def _test_comm(self, input_size: int):
from torch.distributed._composable.fsdp import fully_shard
from torch.distributed._tensor import distribute_tensor

model = nn.Linear(input_size, input_size, device="cuda")
model = nn.Linear(input_size, input_size, device=_DEVICE)
origin_tensor = model.weight
origin_nf4_tensor = to_nf4(origin_tensor)
model = fully_shard(model)
Expand Down
Loading